/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

/*-------------------------------------------------------------------------
 * ic_udp.c
 *	   Interconnect code specific to UDP transport.
 *-------------------------------------------------------------------------
 */

#ifdef WIN32
/*
 * Need this to get WSAPoll (poll). And it
 * has to be set before any header from the Win32 API is loaded.
 */
#undef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#endif

#include "postgres.h"

#include <pthread.h>

#include "access/transam.h"
#include "nodes/execnodes.h"
#include "nodes/pg_list.h"
#include "nodes/print.h"
#include "utils/memutils.h"
#include "utils/hsearch.h"
#include "miscadmin.h"
#include "libpq/libpq-be.h"
#include "libpq/ip.h"
#include "utils/atomic.h"
#include "utils/builtins.h"
#include "utils/debugbreak.h"
#include "utils/faultinjector.h"
#include "utils/pg_crc.h"
#include "port/pg_crc32c.h"

#include "cdb/cdbselect.h"
#include "cdb/tupchunklist.h"
#include "cdb/ml_ipc.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbdisp.h"
#include "cdb/dispatcher.h"
#include "cdb/dispatcher_new.h"
#include "cdb/cdbicudpfaultinjection.h"

#include "portability/instr_time.h"

#include <fcntl.h>
#include <limits.h>
#include <unistd.h>
#include <arpa/inet.h>
#include "pgtime.h"
#include <netinet/in.h>

#include "port.h"


#ifdef WIN32
#define WIN32_LEAN_AND_MEAN
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#endif
#include <winsock2.h>
#include <ws2tcpip.h>
#define SHUT_RDWR SD_BOTH
#define SHUT_RD SD_RECEIVE
#define SHUT_WR SD_SEND

/* If we have old platform sdk headers, WSAPoll() might not be there */
#ifndef POLLIN
/* Event flag definitions for WSAPoll(). */

#define POLLRDNORM	0x0100
#define POLLRDBAND	0x0200
#define POLLIN		(POLLRDNORM | POLLRDBAND)
#define POLLPRI		0x0400

#define POLLWRNORM	0x0010
#define POLLOUT		(POLLWRNORM)
#define POLLWRBAND	0x0020

#define POLLERR		0x0001
#define POLLHUP		0x0002
#define POLLNVAL	0x0004

typedef struct pollfd {

	SOCKET	fd;
	SHORT	events;
	SHORT	revents;

} WSAPOLLFD, *PWSAPOLLFD, FAR *LPWSAPOLLFD;
__control_entrypoint(DllExport)
WINSOCK_API_LINKAGE
int
WSAAPI
WSAPoll(
	IN OUT LPWSAPOLLFD fdArray,
	IN ULONG fds,
	IN INT timeout
	);
#endif

#define poll WSAPoll

/*
 * Postgres normally uses it's own custom select implementation
 * on Windows, but they haven't implemented execeptfds, which
 * we use here.  So, undef this to use the normal Winsock version
 * for now
 */
#undef select
#endif

#define MAX_TRY (11)
int
timeoutArray[] =
{
	1,
	1,
	2,
	4,
	8,
	16,
	32,
	64,
	128,
	256,
	512,
	512 /* MAX_TRY*/
};
#define TIMEOUT(try) ((try) < MAX_TRY ? (timeoutArray[(try)]) : (timeoutArray[MAX_TRY]))

/* 1/4 sec in msec */
#define RX_THREAD_POLL_TIMEOUT (250)

/*
 * Flags definitions for flag-field of UDP-messages
 *
 * We use bit operations to test these, flags are powers of two only
 */
#define UDPIC_FLAGS_RECEIVER_TO_SENDER  (1)
#define UDPIC_FLAGS_ACK					(2)
#define UDPIC_FLAGS_STOP				(4)
#define UDPIC_FLAGS_EOS					(8)
#define UDPIC_FLAGS_NAK					(16)
#define UDPIC_FLAGS_DISORDER    		(32)
#define UDPIC_FLAGS_DUPLICATE   		(64)
#define UDPIC_FLAGS_CAPACITY    		(128)

/*
 * ConnHtabBin
 *
 * A connection hash table bin.
 *
 */
typedef struct ConnHtabBin ConnHtabBin;
struct ConnHtabBin
{
	MotionConn *conn;
	struct ConnHtabBin *next;
};

/*
 * ConnHashTable
 *
 * Connection hash table definition.
 *
 */
typedef struct ConnHashTable ConnHashTable;
struct ConnHashTable
{
	MemoryContext	cxt;
	ConnHtabBin	**table;
	int		size;
};

/* TODO: Should figure out a way to set this hash table size. */
#define DEFAULT_CONN_HTAB_SIZE (Max((128*Gp_interconnect_hash_multiplier), 16))
#define CONN_HASH_VALUE(icpkt) ((uint32)((((icpkt)->srcPid ^ (icpkt)->dstPid)) + (icpkt)->dstContentId))
#define CONN_HASH_MATCH(a, b) (((a)->motNodeId == (b)->motNodeId && \
								(a)->dstContentId == (b)->dstContentId && \
								(a)->srcContentId == (b)->srcContentId && \
								(a)->recvSliceIndex == (b)->recvSliceIndex && \
								(a)->sendSliceIndex == (b)->sendSliceIndex && \
								(a)->srcPid == (b)->srcPid &&			\
								(a)->dstPid == (b)->dstPid && (a)->icId == (b)->icId))


/*
 * Cursor IC table definition.
 *
 * For cursor case, there may be several concurrent interconnect
 * instances on QD. The table is used to track the status of the
 * instances, which is quite useful for "ACK the past and NAK the future" paradigm.
 *
 */
#define CURSOR_IC_TABLE_SIZE (128)

/*
 * CursorICHistoryEntry
 *
 * The definition of cursor IC history entry.
 */
typedef struct CursorICHistoryEntry CursorICHistoryEntry;
struct CursorICHistoryEntry
{
	/* Interconnect instance id. */
	uint32 icId;

	/* Command id. */
	uint32 cid;

	/* Interconnect instance status.
	 * state 1 (value 1): interconnect is setup
	 * state 0 (value 0): interconnect was torn down.
	 */
	uint8 status;

	/* Next entry. */
	CursorICHistoryEntry *next;
};

/*
 * CursorICHistoryTable
 *
 * Cursor IC history table. It is a small hash table.
 */
typedef struct CursorICHistoryTable CursorICHistoryTable;
struct CursorICHistoryTable
{
	uint32 count;
	CursorICHistoryEntry *table[CURSOR_IC_TABLE_SIZE];
};

/*
 * Synchronization timeout values
 *
 * MAIN_THREAD_COND_TIMEOUT - 1/4 second in usec
 */
#define MAIN_THREAD_COND_TIMEOUT (250000)

// Now Turn off udp signal for Mac OS
#define IC_USE_PTHREAD_SYNCHRONIZATION

/*
 *  Used for synchronization between main thread (receiver) and background thread.
 *
 */
typedef struct ThreadWaitingState ThreadWaitingState;
struct ThreadWaitingState
{
	bool waiting;
	int waitingNode;
	int waitingRoute;
	int reachRoute;

	/* main_thread_waiting_query is needed to disambiguate for cursors */
	int waitingQuery;
};

/*
 * ReceiveControlInfo
 *
 * The related control information for receiving data packets.
 * Main thread (Receiver) and background thread use the information in
 * this data structure to handle data packets.
 *
 */
typedef struct ReceiveControlInfo ReceiveControlInfo;
struct ReceiveControlInfo
{
	/* Main thread waiting state. */
	ThreadWaitingState mainWaitingState;

	/*
	 * Buffers used to assemble disorder messages at receiver side.
	 */
	icpkthdr *disorderBuffer;

	/* The last interconnect instance id which is torn down. */
	uint32 lastTornIcId;

	/* Cursor history table. */
	CursorICHistoryTable cursorHistoryTable;

	/*
	 * Last distributed transaction id when SetupUDPInterconnect is called.
	 * Coupled with cursorHistoryTable, it is used to handle multiple concurrent cursor
	 * cases.
	 */
	DistributedTransactionId lastDXatId;
};

/*
 * Main thread (Receiver) and background thread use the information in
 * this data structure to handle data packets.
 */
static ReceiveControlInfo rx_control_info;


/*
 * RxBufferPool
 *
 * Receive thread buffer pool definition. The implementation of
 * receive side buffer pool is different from send side buffer pool.
 * It is because receive side buffer pool needs a ring buffer to
 * easily implement disorder message handling logic.
 */

typedef struct RxBufferPool RxBufferPool;
struct RxBufferPool
{
	/* The max number of buffers we can get from this pool. */
	int	maxCount;

	/* The number of allocated buffers */
	int count;

	/* The list of free buffers. */
	char *freeList;
};

/*
 * The buffer pool used for keeping data packets.
 *
 * maxCount is set to 1 to make sure there is always a buffer
 * for picking packets from OS buffer.
 */
static RxBufferPool rx_buffer_pool = {1, 0, NULL};

/*
 * SendBufferPool
 *
 * The send side buffer pool definition.
 *
 */
typedef struct SendBufferPool SendBufferPool;
struct SendBufferPool
{
	/* The maximal number of buffers sender can use. */
	int maxCount;

	/* The number of buffers sender already used. */
	int count;

	/* The free buffer list at the sender side. */
	ICBufferList freeList;
};

/*
 * The sender side buffer pool.
 */
static SendBufferPool snd_buffer_pool;

/*
 * SendControlInfo
 *
 * The related control information for sending data packets and handing acks.
 * Main thread use the information in this data structure to do ack handling
 * and congestion control.
 *
 */
typedef struct SendControlInfo SendControlInfo;
struct SendControlInfo
{
	/* The buffer used for accepting acks */
	icpkthdr *ackBuffer;

	/* congestion window */
	float cwnd;

	/* minimal congestion control window */
	float minCwnd;

	/* slow start threshold */
	float ssthresh;

};

/*
 * Main thread use the information in this data structure to do ack handling
 * and congestion control.
 */
static SendControlInfo snd_control_info;

#if defined(__darwin__) && !defined(IC_USE_PTHREAD_SYNCHRONIZATION)
/*
 * UDPSignal
 * 		The udp interconnect specific implementation of timeout wait/signal mechanism.
 * 		(Only used for MacOS to avoid the bug in MacOS 10.6.x: MPP-9910).
 * 		More details are available in the functions to implement UDPSignal.
 */
typedef struct UDPSignal UDPSignal;
struct UDPSignal
{
	/* We often use the address of a pthread condition variable as signal/condition id. */
	void	*sigId;

	/* The udp socket fd to implement the mechanism. */
	int		fd;

	/* The port. */
	int32	port;

	/* The UDP socket address family */
	int 	txFamily;

	/* Address info. */
	struct sockaddr_storage peer;
	socklen_t peer_len;
};
#endif

/*
 * ICGlobalControlInfo
 *
 * Some shared control information that is used by main thread (senders, receivers, or both)
 * and the background thread.
 *
 */
typedef struct ICGlobalControlInfo ICGlobalControlInfo;
struct ICGlobalControlInfo
{
	/* The background thread handle. */
	pthread_t threadHandle;

	/* flag showing whether the thread is created. */
	bool threadCreated;

	/* The lock protecting eno field. */
	pthread_mutex_t	errorLock;
	int  eno;

	/* Keep the udp socket buffer size used. */
	uint32 socketSendBufferSize;
	uint32 socketRecvBufferSize;

	uint64 lastExpirationCheckTime;
	uint64 lastDeadlockCheckTime;

	/* Used to decide whether to retransmit for capacity based FC. */
	uint64 lastPacketSendTime;

	/* MemoryContext for UDP interconnect. */
	MemoryContext memContext;

	/*
	 * Lock and condition variable for coordination between
	 * main thread and background thread. It protects the shared data
	 * between the two thread (the connHtab, rx buffer pool and the mainWaitingState etc.).
	 */
	pthread_mutex_t lock;
	pthread_cond_t cond;

	/* Am I a sender? */
	bool isSender;

	/* Global connection htab for both sending connections and
	 * receiving connections. Protected by the lock in this data structure.
	 */
	ConnHashTable connHtab;

    /* The connection htab used to cache future packets. */
	ConnHashTable startupCacheHtab;

	/* Used by main thread to ask the background thread to exit. */
	uint32 shutdown;

#if defined(__darwin__) && !defined(IC_USE_PTHREAD_SYNCHRONIZATION)
	UDPSignal usig;
#endif

};

/*
 * Shared control information that is used by senders, receivers and background thread.
 */
static ICGlobalControlInfo ic_control_info;

/*
 * Macro for unack queue ring, round trip time (RTT) and expiration period (RTO)
 *
 * UNACK_QUEUE_RING_SLOTS_NUM - the number of slots in the unack queue ring.
 *                              this value should be greater than or equal to 2.
 * TIMER_SPAN                 - timer period in us
 * TIMER_CHECKING_PERIOD      - timer checking period in us
 * UNACK_QUEUE_RING_LENGTH    - the whole time span of the unack queue ring
 * DEFAULT_RTT                - default rtt in us.
 * MIN_RTT                    - min rtt in us
 * MAX_RTT                    - max rtt in us
 * RTT_SHIFT_COEFFICIENT      - coefficient for RTT computation
 *
 * DEFAULT_DEV                - default round trip standard deviation
 * MAX_DEV                    - max dev
 * DEV_SHIFT_COEFFICIENT      - coefficient for DEV computation
 *
 * MAX_EXPIRATION_PERIOD      - max expiration period in us
 * MIN_EXPIRATION_PERIOD      - min expiration period in us
 * MAX_TIME_NO_TIMER_CHECKING - max time without checking timer
 * DEADLOCK_CHECKING_TIME     - deadlock checking time
 *
 * MAX_SEQS_IN_DISORDER_ACK   - max number of sequences that can be transmitted in a
 *                              disordered packet ack.
 *
 *
 * Considerations on the settings of the values:
 *
 * TIMER_SPAN and UNACK_QUEUE_RING_SLOTS_NUM define the ring period.
 * Currently, it is UNACK_QUEUE_RING_LENGTH (default 10 seconds).
 *
 * The definition of UNACK_QUEUE_RING_LENGTH is quite related to the size of
 * sender side buffer and the size we may resend in a burst for an expiration event
 * (which may overwhelm switch or OS if it is too large).
 * Thus, we do not want to send too much data in a single expiration event. Here, a
 * relatively large UNACK_QUEUE_RING_SLOTS_NUM value is used to avoid that.
 *
 * If the sender side buffer is X (MB), then on each slot,
 * there are about X/UNACK_QUEUE_RING_SLOTS_NUM. Even we have a very large sender buffer,
 * for example, 100MB, there is about 96M/2000 = 50K per slot.
 * This is fine for the OS (with buffer 2M for each socket generally) and switch.
 *
 * Note that even when the buffers are not evenly distributed in the ring and there are some packet
 * losses, the congestion control mechanism, the disorder and duplicate packet handling logic will
 * make assure the number of outstanding buffers (in unack queues) not very large.
 *
 * MIN_RTT/MAX_RTT/DEFAULT_RTT/MIN_EXPIRATION_PERIOD/MAX_EXPIRATION_PERIOD gives some heuristic values about
 * the computation of RTT and expiration period. RTT and expiration period (RTO) are not
 * constant for various kinds of hardware and workloads. Thus, they are computed dynamically.
 * But we also want to bound the values of RTT and MAX_EXPIRATION_PERIOD. It is
 * because there are some faults that may make RTT a very abnormal value. Thus, RTT and
 * expiration period are upper and lower bounded.
 *
 * MAX_SEQS_IN_DISORDER_ACK should be smaller than (MIN_PACKET_SIZE - sizeof(icpkthdr))/sizeof(uint32).
 * It is due to the limitation of the ack receive buffer size.
 *
 */
#define UNACK_QUEUE_RING_SLOTS_NUM (2000)
#define TIMER_SPAN (Gp_interconnect_timer_period * 1000) /* default: 5ms */
#define TIMER_CHECKING_PERIOD (Gp_interconnect_timer_checking_period) /* default: 20ms */
#define UNACK_QUEUE_RING_LENGTH (UNACK_QUEUE_RING_SLOTS_NUM * TIMER_SPAN)

#define DEFAULT_RTT (Gp_interconnect_default_rtt * 1000) /* default: 20ms */
#define MIN_RTT (100) /* 0.1ms */
#define MAX_RTT (200 * 1000) /* 200ms */
#define RTT_SHIFT_COEFFICIENT (3) /* RTT_COEFFICIENT 1/8 (0.125) */

#define DEFAULT_DEV (0)
#define MIN_DEV MIN_RTT
#define MAX_DEV MAX_RTT
#define DEV_SHIFT_COEFFICIENT (2) /* DEV_COEFFICIENT 1/4 (0.25) */

#define MAX_EXPIRATION_PERIOD (1000 * 1000) /* 1s */
#define MIN_EXPIRATION_PERIOD (Gp_interconnect_min_rto * 1000) /* default: 20ms */

#define MAX_TIME_NO_TIMER_CHECKING (50 * 1000) /* 50ms */
#define DEADLOCK_CHECKING_TIME  (512 * 1000) /* 512ms */

#define MAX_SEQS_IN_DISORDER_ACK (4)

/*
 * UnackQueueRing
 *
 * An unacked queue ring is used to decide which packet is expired in constant time.
 *
 * Each slot of the ring represents a fixed time span, for example 1ms, and
 * each slot has a associated buffer list/queue which contains the packets
 * which will expire in the time span.
 *
 * If the current time pointer (time t) points to slot 1,
 * then slot 2 represents the time span from t + 1ms to t + 2ms.
 * When we check whether there are some packets expired, we start from the last
 * current time recorded, and resend all the packets in the queue
 * until we reach the slot that the updated current time points to.
 *
 */
typedef struct UnackQueueRing UnackQueueRing;
struct UnackQueueRing
{
	/* save the current time when we check the time wheel for expiration */
	uint64 currentTime;

	/* the slot index corresponding to current time */
	int	idx;

	/* the number of outstanding packets in unack queue ring */
	int numOutStanding;

	/* the number of outstanding packets that use the
	 * shared bandwidth in the congestion window. */
	int numSharedOutStanding;

	/* time slots */
	ICBufferList slots[UNACK_QUEUE_RING_SLOTS_NUM];
};

/*
 * All connections in a process share this unack queue ring instance.
 */
static UnackQueueRing unack_queue_ring = {0, 0, 0};

/*
 * AckSendParam
 *
 * The prarmeters for ack sending.
 */
typedef struct AckSendParam
{
	/* header for the ack */
	icpkthdr msg;

	/* peer address for the ack */
	struct sockaddr_storage peer;
	socklen_t peer_len;
} AckSendParam;

/*
 * ICStatistics
 *
 * A structure keeping various statistics about interconnect internal.
 *
 * Note that the statistics for ic is not accurate for multiple cursor case on QD.
 *
 * totalRecvQueueSize        - receive queue size sum when main thread is trying to get a packet.
 * recvQueueSizeCountingTime - counting times when computing totalRecvQueueSize.
 * totalCapacity             - the capacity sum when packets are tried to be sent.
 * capacityCountingTime      - counting times used to compute totalCapacity.
 * totalBuffers              - total buffers available when sending packets.
 * bufferCountingTime        - counting times when compute totalBuffers.
 * retransmits               - the number of packet retransmits.
 * mismatchNum               - the number of mismatched packets received.
 * crcErrors                 - the number of crc errors.
 * sndPktNum                 - the number of packets sent by sender.
 * recvPktNum                - the number of packets received by receiver.
 * disorderedPktNum          - disordered packet number.
 * duplicatedPktNum          - duplicate packet number.
 * recvAckNum                - the number of Acks received.
 * statusQueryMsgNum         - the number of status query messages sent.
 *
 */
typedef struct ICStatistics
{
	uint64	totalRecvQueueSize;
	uint64	recvQueueSizeCountingTime;
	uint64	totalCapacity;
	uint64	capacityCountingTime;
	uint64	totalBuffers;
	uint64	bufferCountingTime;
	int32	retransmits;
	int32	startupCachedPktNum;
	int32	mismatchNum;
	int32	crcErrors;
	int32	sndPktNum;
	int32	recvPktNum;
	int32	disorderedPktNum;
	int32   duplicatedPktNum;
	int32	recvAckNum;
	int32	statusQueryMsgNum;
} ICStatistics;

/* Statistics for UDP interconnect. */
static ICStatistics ic_statistics;

/*=========================================================================
 * STATIC FUNCTIONS declarations
 */

/* Cursor IC History table related functions. */
static void initCursorICHistoryTable(CursorICHistoryTable *t);
static void addCursorIcEntry(CursorICHistoryTable *t, uint32 icId, uint32 cid);
static void updateCursorIcEntry(CursorICHistoryTable *t, uint32 icId, uint8 status);
static CursorICHistoryEntry *getCursorIcEntry(CursorICHistoryTable *t, uint32 icId);
static void pruneCursorIcEntry(CursorICHistoryTable *t, uint32 icId);
static void purgeCursorIcEntry(CursorICHistoryTable *t);

static void resetMainThreadWaiting(ThreadWaitingState *state);
static void setMainThreadWaiting(ThreadWaitingState *state, int motNodeId, int route, int icId);

/* Background thread error handling functions. */
static void checkRxThreadError(void);
static void setRxThreadError(int eno);
static void resetRxThreadError(void);

static void getSockAddr(struct sockaddr_storage * peer, socklen_t * peer_len, const char * listenerAddr, int listenerPort);
static void setXmitSocketOptions(int txfd);
static uint32 setSocketBufferSize(int fd, int type, int expectedSize, int leastSize);
static void setupUDPListeningSocket(int *listenerSocketFd, uint16 *listenerPort, int *txFamily);
static ChunkTransportStateEntry *startOutgoingUDPConnections(ChunkTransportState *transportStates,
															 Slice *sendSlice,
															 int *pOutgoingCount);
static void setupOutgoingUDPConnection(ChunkTransportState *transportStates,
									   ChunkTransportStateEntry *pEntry, MotionConn *conn);
static char *formatSockAddr(struct sockaddr *sa, char* buf, int bufsize);

/* Connection hash table functions. */
static bool initConnHashTable(ConnHashTable *ht, MemoryContext ctx);
static bool connAddHash(ConnHashTable *ht, MotionConn *conn);
static MotionConn *findConnByHeader(ConnHashTable *ht, icpkthdr *hdr);
static void destroyConnHashTable(ConnHashTable *ht);

static inline void sendAckWithParam(AckSendParam *param);
static void sendAck(MotionConn *conn, int32 flags, uint32 seq, uint32 extraSeq);
static void sendDisorderAck(MotionConn *conn, uint32 seq, uint32 extraSeq, uint32 lostPktCnt);
static void sendStatusQueryMessage(MotionConn *conn, int fd, uint32 seq);
static inline void sendControlMessage(icpkthdr *pkt, int fd, struct sockaddr *addr, socklen_t peerLen);

static void putRxBufferAndSendAck(MotionConn *conn, AckSendParam *param);
static inline void putRxBufferToFreeList(RxBufferPool *p, icpkthdr *buf);
static inline icpkthdr *getRxBufferFromFreeList(RxBufferPool *p);
static icpkthdr *getRxBuffer(RxBufferPool *p);
static void initRxBufferPool(RxBufferPool *p);

/* ICBufferList functions. */
static inline void icBufferListInitHeadLink(ICBufferLink *link);
static inline void icBufferListInit(ICBufferList *list, ICBufferListType type);
static inline bool icBufferListIsHead(ICBufferList *list, ICBufferLink *link);
static inline ICBufferLink *icBufferListFirst(ICBufferList *list);
static inline int icBufferListLength(ICBufferList *list);
static inline ICBuffer *icBufferListDelete(ICBufferList *list, ICBuffer *buf);
static inline ICBuffer *icBufferListPop(ICBufferList *list);
static void icBufferListFree(ICBufferList *list);
static inline ICBuffer *icBufferListAppend(ICBufferList *list, ICBuffer *buf);
static void icBufferListReturn(ICBufferList *list, bool inExpirationQueue);

static void SetupUDPInterconnect_Internal(EState *estate);
static inline TupleChunkListItem
RecvTupleChunkFromAnyUDP_Internal(MotionLayerState *mlStates,
						 ChunkTransportState *transportStates,
						 int16 motNodeID,
						 int16 *srcRoute);
static inline TupleChunkListItem
RecvTupleChunkFromUDP_Internal(ChunkTransportState *transportStates,
					  int16		motNodeID,
					  int16		srcRoute);
static void TeardownUDPInterconnect_Internal(ChunkTransportState *transportStates,
						MotionLayerState *mlStates,
						bool forceEOS);

static void freeDisorderedPackets(MotionConn *conn);
static void checkForCancelFromQD(ChunkTransportState *pTransportStates);


static void prepareRxConnForRead(MotionConn *conn);
static TupleChunkListItem RecvTupleChunkFromAnyUDP(MotionLayerState *mlStates,
												   ChunkTransportState *transportStates,
												   int16 motNodeID,
												   int16 *srcRoute);

static TupleChunkListItem RecvTupleChunkFromUDP(ChunkTransportState *transportStates,
												int16		motNodeID,
												int16		srcRoute);
static TupleChunkListItem
receiveChunksUDP(ChunkTransportState *pTransportStates, ChunkTransportStateEntry *pEntry,
				 int16 motNodeID, int16 *srcRoute, MotionConn *conn, bool inTeardown);

static void SendEosUDP(MotionLayerState *mlStates, ChunkTransportState *transportStates,
					   int motNodeID, TupleChunkListItem tcItem);
static bool SendChunkUDP(MotionLayerState *mlStates, ChunkTransportState *transportStates,
						 ChunkTransportStateEntry *pEntry, MotionConn * conn, TupleChunkListItem tcItem, int16 motionId);

static void doSendStopMessageUDP(ChunkTransportState *transportStates, int16 motNodeID);
static bool dispatcherAYT(void);

static void *rxThreadFunc(void *arg);

static bool handleMismatch(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len);
static void inline handleAckedPacket(MotionConn *ackConn, ICBuffer *buf, uint64 now);
static bool handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry);
static void handleStopMsgs(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, int16 motionId);
static void handleDisorderPacket(MotionConn *conn, int pos, uint32 tailSeq, icpkthdr *pkt);
static bool handleDataPacket(MotionConn *conn, icpkthdr *pkt, struct sockaddr_storage *peer, socklen_t *peerlen, AckSendParam *param);
static bool handleAckForDuplicatePkt(MotionConn *conn, icpkthdr *pkt);
static bool handleAckForDisorderPkt(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn, icpkthdr *pkt);

static inline void prepareXmit(MotionConn *conn);
static inline void addCRC(icpkthdr *pkt);
static inline bool checkCRC(icpkthdr *pkt);
static void sendBuffers(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn);
static void sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, ICBuffer *buf, MotionConn * conn);
static inline uint64 computeExpirationPeriod(MotionConn *conn, uint32 retry);

static ICBuffer *getSndBuffer(MotionConn *conn);
static void initSndBufferPool();

static void putIntoUnackQueueRing(UnackQueueRing *uqr, ICBuffer *buf, uint64 expTime, uint64 now);
static void initUnackQueueRing(UnackQueueRing *uqr);

static void checkExpiration(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *triggerConn, uint64 now);
static void checkDeadlock(ChunkTransportStateEntry *pEntry, MotionConn *conn);

static bool cacheFuturePacket(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len);
static void cleanupStartupCache(void);
static void handleCachedPackets(void);

static uint64 getCurrentTime(void);
static void initMutex(pthread_mutex_t *mutex);
static bool waitOnCondition(int timeout_us, pthread_cond_t *cond, pthread_mutex_t *mutex);

static inline void logPkt(char *prefix, icpkthdr *pkt);
static void aggregateStatistics(ChunkTransportStateEntry *pEntry);

static inline bool pollAcks(ChunkTransportState *transportStates, int fd, int timeout);


#if defined(__darwin__) && !defined(IC_USE_PTHREAD_SYNCHRONIZATION)
static int udpSignalTimeoutWait(UDPSignal *sig, pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *ts);
static bool udpSignalPoll(UDPSignal *sig, int timeout);
static bool udpSignalGet(UDPSignal *sig);
static void setupUDPSignal(UDPSignal *sig);
static void destroyUDPSignal(UDPSignal *sig);
static void udpSignal(UDPSignal *sig);
#endif

/* #define TRANSFER_PROTOCOL_STATS */

#ifdef TRANSFER_PROTOCOL_STATS
typedef enum TransProtoEvent TransProtoEvent;
enum TransProtoEvent {
	TPE_DATA_PKT_SEND,
	TPE_ACK_PKT_QUERY
};

typedef struct TransProtoStatEntry TransProtoStatEntry;
struct TransProtoStatEntry
{
	TransProtoStatEntry	*next;

	/* Basic information */
	uint32				time;
	TransProtoEvent		event;
	int					dstPid;
	uint32				seq;

	/* more attributes can be added on demand. */
	/*
	 * float			cwnd;
	 * int				capacity;
	 */
};

typedef struct TransProtoStats TransProtoStats;
struct TransProtoStats
{
	pthread_mutex_t		lock;
	TransProtoStatEntry	*head;
	TransProtoStatEntry	*tail;
	uint64				count;
	uint64				startTime;
};

static TransProtoStats trans_proto_stats = {PTHREAD_MUTEX_INITIALIZER, NULL, NULL, 0};

/*
 * initTransProtoStats
 * 		Initialize the transport protocol states data structures.
 */
static void
initTransProtoStats()
{
	pthread_mutex_lock(&trans_proto_stats.lock);

	while (trans_proto_stats.head) {
		TransProtoStatEntry *cur = NULL;

		cur = trans_proto_stats.head;
		trans_proto_stats.head = trans_proto_stats.head->next;

		free(cur);
		trans_proto_stats.count--;
	}

	trans_proto_stats.head = NULL;
	trans_proto_stats.tail = NULL;
	trans_proto_stats.count = 0;
	trans_proto_stats.startTime = getCurrentTime();
	pthread_mutex_unlock(&trans_proto_stats.lock);
}

static void
updateStats(TransProtoEvent event, MotionConn *conn, icpkthdr *pkt)
{
	TransProtoStatEntry *new = NULL;

	/* Add to list */
	new = (TransProtoStatEntry *) malloc(sizeof(TransProtoStatEntry));
	if (!new)
		return;

	memset(new, 0, sizeof(*new));

	/* change the list */
	pthread_mutex_lock(&trans_proto_stats.lock);
	if (trans_proto_stats.count == 0)
	{
		/* 1st element */
		trans_proto_stats.head = new;
		trans_proto_stats.tail = new;
	}
	else
	{
		trans_proto_stats.tail->next = new;
		trans_proto_stats.tail = new;
	}
	trans_proto_stats.count++;

	new->time = getCurrentTime() - trans_proto_stats.startTime;
	new->event = event;
	new->dstPid = pkt->dstPid;
	new->seq = pkt->seq;

	/* Other attributes can be added on demand
	 *	new->cwnd = snd_control_info.cwnd;
	 *	new->capacity = conn->capacity;
	 */

	pthread_mutex_unlock(&trans_proto_stats.lock);
}

static void
dumpTransProtoStats()
{
	char tmpbuf[32];

	snprintf(tmpbuf, 32, "%d." UINT64_FORMAT "txt", MyProcPid, getCurrentTime());
	FILE *ofile = fopen(tmpbuf, "w+");

	pthread_mutex_lock(&trans_proto_stats.lock);
	while (trans_proto_stats.head) {
		TransProtoStatEntry *cur = NULL;

		cur = trans_proto_stats.head;
		trans_proto_stats.head = trans_proto_stats.head->next;

		fprintf(ofile, "time %d event %d seq %d destpid %d\n", cur->time, cur->event, cur->seq, cur->dstPid);
		free(cur);
		trans_proto_stats.count--;
	}

	trans_proto_stats.tail = NULL;

	pthread_mutex_unlock(&trans_proto_stats.lock);

    fclose(ofile);
}

#endif /* TRANSFER_PROTOCOL_STATS */

/*
 * initCursorICHistoryTable
 * 		Initialize cursor ic history table.
 */
static void
initCursorICHistoryTable(CursorICHistoryTable *t)
{
	t->count = 0;
	memset(t->table, 0, sizeof(t->table));
}

/*
 * addCursorIcEntry
 * 		Add an entry to the the cursor ic table.
 */
static void
addCursorIcEntry(CursorICHistoryTable *t, uint32 icId, uint32 cid)
{
	MemoryContext old;
	CursorICHistoryEntry *p;
	uint32 index = icId % CURSOR_IC_TABLE_SIZE;

	old = MemoryContextSwitchTo(ic_control_info.memContext);
	p = palloc0(sizeof(struct CursorICHistoryEntry));
	MemoryContextSwitchTo(old);

	p->icId = icId;
	p->cid = cid;
	p->status = 1;
	p->next = t->table[index];
	t->table[index] = p;
	t->count++;

	elog(DEBUG2, "add icid %d cid %d status %d", p->icId, p->cid, p->status);

	return;
}

/*
 * updateCursorIcEntry
 * 		Update the status of the cursor ic entry for a give interconnect instance id.
 *
 * There are two states for an instance of interconnect.
 * 		state 1 (value 1): interconnect is setup
 * 		state 0 (value 0): interconnect was torn down.
 */
static void
updateCursorIcEntry(CursorICHistoryTable *t, uint32 icId, uint8 status)
{
	struct CursorICHistoryEntry *p;
	uint8 index = icId % CURSOR_IC_TABLE_SIZE;

	for (p = t->table[index]; p; p = p->next)
	{
		if (p->icId == icId)
		{
			p->status = status;
			return;
		}
	}
	/* not found */
}

/*
 * getCursorIcEntry
 * 		Get the cursor entry given interconnect id.
 */
static CursorICHistoryEntry *
getCursorIcEntry(CursorICHistoryTable *t, uint32 icId)
{
	struct CursorICHistoryEntry *p;
	uint8 index = icId % CURSOR_IC_TABLE_SIZE;

	for (p = t->table[index]; p; p = p->next)
	{
		if (p->icId == icId)
		{
			return p;
		}
	}
	/* not found */
	return NULL;
}

/*
 * pruneCursorIcEntry
 * 		Prune entries in the hash table.
 */
static void
pruneCursorIcEntry(CursorICHistoryTable *t, uint32 icId)
{
	uint8 index;

	for (index = 0; index < CURSOR_IC_TABLE_SIZE; index++)
	{
		struct CursorICHistoryEntry *p, *q;

		p = t->table[index];
		q = NULL;
		while (p)
		{
			/*	remove an entry if it is older than the prune-point */
			if (p->icId < icId)
			{
				struct CursorICHistoryEntry *trash;

				if (!q)
				{
					t->table[index] = p->next;
				}
				else
				{
					q->next = p->next;
				}

				trash = p;

				/* set up next loop */
				p = trash->next;
				pfree(trash);

				t->count--;
			}
			else
			{
				q = p;
				p = p->next;
			}
		}
	}
}

/*
 * purgeCursorIcEntry
 *		Clean cursor ic history table.
 */
static void
purgeCursorIcEntry(CursorICHistoryTable *t)
{
	uint8 index;

	for (index = 0; index < CURSOR_IC_TABLE_SIZE; index++)
	{
		struct CursorICHistoryEntry *trash;

		while (t->table[index])
		{
			trash = t->table[index];
			t->table[index] = trash->next;

			pfree(trash);
		}
	}
}

/*
 * resetMainThreadWaiting
 * 		Reset main thread waiting state.
 */
static void
resetMainThreadWaiting(ThreadWaitingState *state)
{
	state->waiting = false;
	state->waitingNode = -1;
	state->waitingRoute = ANY_ROUTE;
	state->reachRoute = ANY_ROUTE;
	state->waitingQuery = -1;
}

/*
 * setMainThreadWaiting
 * 		Set main thread waiting state.
 */
static void
setMainThreadWaiting(ThreadWaitingState *state, int motNodeId, int route, int icId)
{
	state->waiting = true;
	state->waitingNode = motNodeId;
	state->waitingRoute = route;
	state->reachRoute = ANY_ROUTE;
	state->waitingQuery = icId;
}

/*
 * checkRxThreadError
 * 		Check whether there was error in the background thread in main thread.
 *
 * 	If error found, report it.
 */
static void
checkRxThreadError()
{
	pthread_mutex_lock(&ic_control_info.errorLock);
	if (ic_control_info.eno != 0)
	{
		errno = ic_control_info.eno;
		pthread_mutex_unlock(&ic_control_info.errorLock);

		ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
						errmsg("Interconnect encountered an error"),
						errdetail("%m%s", "in receive background thread,")));
	}
	pthread_mutex_unlock(&ic_control_info.errorLock);
}

/*
 * setRxThreadError
 * 		Set the error no in background thread.
 *
 * 	Record the error in background thread. Main thread checks the errors periodically.
 * 	If main thread will find it, main thread will handle it.
 */
static void
setRxThreadError(int eno)
{
	pthread_mutex_lock(&ic_control_info.errorLock);

	/* always let main thread know the error that occurred first. */
	if (ic_control_info.eno == 0)
	{
		ic_control_info.eno = eno;
		write_log("Interconnect error: in background thread, set ic_control_info.eno to %d, rx_buffer_pool.count %d, rx_buffer_pool.maxCount %d", eno, rx_buffer_pool.count, rx_buffer_pool.maxCount);
	}

	pthread_mutex_unlock(&ic_control_info.errorLock);
}

/*
 * resetRxThreadError
 * 		Reset the error no.
 *
 */
static void
resetRxThreadError()
{
	pthread_mutex_lock(&ic_control_info.errorLock);
	ic_control_info.eno = 0;
	pthread_mutex_unlock(&ic_control_info.errorLock);
}


/*
 * setupUDPListeningSocket
 * 		Setup udp listening socket.
 */
static void
setupUDPListeningSocket(int *listenerSocketFd, uint16 *listenerPort, int *txFamily)
{
	int					errnoSave;
	int					fd = -1;
	const char		   *fun;

	/*
	 * At the moment, we don't know which of IPv6 or IPv4 is wanted,
	 * or even supported, so just ask getaddrinfo...
	 *
	 * Perhaps just avoid this and try socket with AF_INET6 and AF_INT?
	 *
	 * Most implementation of getaddrinfo are smart enough to only
	 * return a particular address family if that family is both enabled,
	 * and at least one network adapter has an IP address of that family.
	 */
	struct addrinfo hints;
	struct addrinfo *addrs, *rp;
	int  s;
	struct sockaddr_storage our_addr;
	socklen_t our_addr_len;
	char service[32];
	snprintf(service,32,"%d",0);
	memset(&hints, 0, sizeof(struct addrinfo));
	hints.ai_family = AF_UNSPEC;	/* Allow IPv4 or IPv6 */
	hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */
	hints.ai_flags = AI_PASSIVE;	/* For wildcard IP address */
	hints.ai_protocol = 0;			/* Any protocol */

#ifdef USE_ASSERT_CHECKING
	if (gp_udpic_network_disable_ipv6)
		hints.ai_family = AF_INET;
#endif

#ifdef __darwin__
	hints.ai_family = AF_INET; /* Due to a bug in OSX Leopard, disable IPv6 for UDP interconnect on all OSX platforms */
#endif

	fun = "getaddrinfo";
	s = getaddrinfo(NULL, service, &hints, &addrs);
	if (s != 0)
		elog(ERROR, "getaddrinfo says %s", gai_strerror(s));

	/*
	 * getaddrinfo() returns a list of address structures,
	 * one for each valid address and family we can use.
	 *
	 * Try each address until we successfully bind.
	 * If socket (or bind) fails, we (close the socket
	 * and) try the next address.  This can happen if
	 * the system supports IPv6, but IPv6 is disabled from
	 * working, or if it supports IPv6 and IPv4 is disabled.
	 */

	/*
	 * If there is both an AF_INET6 and an AF_INET choice,
	 * we prefer the AF_INET6, because on UNIX it can receive either
	 * protocol, whereas AF_INET can only get IPv4.  Otherwise we'd need
	 * to bind two sockets, one for each protocol.
	 *
	 * Why not just use AF_INET6 in the hints?	That works perfect
	 * if we know this machine supports IPv6 and IPv6 is enabled,
	 * but we don't know that.
	 */

#ifndef __darwin__
#ifdef HAVE_IPV6
	if (addrs->ai_family == AF_INET && addrs->ai_next != NULL && addrs->ai_next->ai_family == AF_INET6)
	{
		/*
		 * We got both an INET and INET6 possibility, but we want to prefer the INET6 one if it works.
		 * Reverse the order we got from getaddrinfo so that we try things in our preferred order.
		 * If we got more possibilities (other AFs??), I don't think we care about them, so don't
		 * worry if the list is more that two, we just rearrange the first two.
		 */
		struct addrinfo *temp = addrs->ai_next;		/* second node */
		addrs->ai_next = addrs->ai_next->ai_next;	/* point old first node to third node if any */
		temp->ai_next = addrs;						/* point second node to first */
		addrs = temp;								/* start the list with the old second node */
		elog(DEBUG1,"Have both IPv6 and IPv4 choices");
	}
#endif
#endif

	for (rp = addrs; rp != NULL; rp = rp->ai_next)
	{
		fun = "socket";
		/*
		 * getaddrinfo gives us all the parameters for the socket() call
		 * as well as the parameters for the bind() call.
		 */
		elog(DEBUG1,"receive socket ai_family %d ai_socktype %d ai_protocol %d",rp->ai_family, rp->ai_socktype, rp->ai_protocol);
		fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
		if (fd == -1)
			continue;
		elog(DEBUG1,"receive socket %d ai_family %d ai_socktype %d ai_protocol %d",fd,rp->ai_family, rp->ai_socktype, rp->ai_protocol);

		fun = "fcntl(O_NONBLOCK)";
		if (!pg_set_noblock(fd))
		{
			if (fd >= 0)
				closesocket(fd);
			continue;
		}

		fun = "bind";
		elog(DEBUG1,"bind addrlen %d fam %d",rp->ai_addrlen,rp->ai_addr->sa_family);
		if (bind(fd, rp->ai_addr, rp->ai_addrlen) == 0)
		{
			*txFamily = rp->ai_family;
			break;					/* Success */
		}

		if (fd >= 0)
			closesocket(fd);
	}

	if (rp == NULL)
	{				/* No address succeeded */
		goto error;
	}

	freeaddrinfo(addrs);		   /* No longer needed */

	/*
	 * Get our socket address (IP and Port), which we will save for others to connected to.
	 */
	MemSet(&our_addr, 0, sizeof(our_addr));
	our_addr_len = sizeof(our_addr);

	fun = "getsockname";
	if (getsockname(fd, (struct sockaddr *) &our_addr, &our_addr_len) < 0)
		goto error;

	Assert(our_addr.ss_family == AF_INET || our_addr.ss_family == AF_INET6 );

	*listenerSocketFd = fd;

	if (our_addr.ss_family == AF_INET6)
		*listenerPort = ntohs(((struct sockaddr_in6 *)&our_addr)->sin6_port);
	else
		*listenerPort = ntohs(((struct sockaddr_in *)&our_addr)->sin_port);

	setXmitSocketOptions(fd);

	return;

error:
	errnoSave = errno;
	if (fd >= 0)
		closesocket(fd);
	errno = errnoSave;
	ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
					errmsg("Interconnect Error: Could not set up udp listener socket."),
					errdetail("%m%s", fun)));
	return;
}

/*
 * InitMutex
 * 		Initialize mutex.
 */
static void
initMutex(pthread_mutex_t *mutex)
{
	pthread_mutexattr_t m_atts;
	pthread_mutexattr_init(&m_atts);
	pthread_mutexattr_settype(&m_atts, PTHREAD_MUTEX_ERRORCHECK);

	pthread_mutex_init(mutex, &m_atts);
}

/*
 * InitMotionUDP
 * 		Initialize UDP specific comms, and create rx-thread.
 */
void
InitMotionUDP(int *listenerSocketFd, uint16 *listenerPort)
{
	int pthread_err;
	int txFamily = -1;

	/* attributes of the thread we're creating */
	pthread_attr_t t_atts;
	MemoryContext old;

	/* Initialize global ic control data. */
	ic_control_info.eno = 0;
	ic_control_info.isSender = false;
	ic_control_info.socketSendBufferSize = 2 * 1024 * 1024;
	ic_control_info.socketRecvBufferSize = 2 * 1024 * 1024;
	ic_control_info.memContext = AllocSetContextCreate(TopMemoryContext,
                                         "UdpInterconnectMemContext",
                                         ALLOCSET_DEFAULT_MINSIZE,
                                         ALLOCSET_DEFAULT_INITSIZE,
                                         ALLOCSET_DEFAULT_MAXSIZE);
	initMutex(&ic_control_info.errorLock);
	initMutex(&ic_control_info.lock);
	pthread_cond_init(&ic_control_info.cond, NULL);
	ic_control_info.shutdown = 0;

	old = MemoryContextSwitchTo(ic_control_info.memContext);

	initConnHashTable(&ic_control_info.connHtab, ic_control_info.memContext);
	if (!initConnHashTable(&ic_control_info.startupCacheHtab, NULL))
		ereport(FATAL, (errcode(ERRCODE_OUT_OF_MEMORY),
					errmsg("failed to initialize connection htab for startup cache")));

	/*
	 * setup listening socket.
	 */
	setupUDPListeningSocket(listenerSocketFd, listenerPort, &txFamily);

#if defined(__darwin__) && !defined(IC_USE_PTHREAD_SYNCHRONIZATION)
	setupUDPSignal(&ic_control_info.usig);
#endif

	/* Initialize receive control data. */
	resetMainThreadWaiting(&rx_control_info.mainWaitingState);

	/* allocate a buffer for sending disorder messages */
	rx_control_info.disorderBuffer = palloc0(MIN_PACKET_SIZE);
	rx_control_info.lastDXatId = InvalidTransactionId;
	rx_control_info.lastTornIcId = 0;
	initCursorICHistoryTable(&rx_control_info.cursorHistoryTable);

	initRxBufferPool(&rx_buffer_pool);

	/* Initialize send control data */
	snd_control_info.cwnd = 0;
	snd_control_info.minCwnd = 0;
	snd_control_info.ackBuffer = palloc0(MIN_PACKET_SIZE);

	MemoryContextSwitchTo(old);

#ifdef TRANSFER_PROTOCOL_STATS
	initMutex(&trans_proto_stats.lock);
#endif

	/* Start up our rx-thread */

	/* save ourselves some memory: the defaults for thread stack
	 * size are large (1M+) */
	pthread_attr_init(&t_atts);

#ifdef pg_on_solaris
	/* Solaris doesn't have PTHREAD_STACK_MIN ? */
	pthread_attr_setstacksize(&t_atts, (128*1024));
#else
	pthread_attr_setstacksize(&t_atts, Max(PTHREAD_STACK_MIN, (128*1024)));
#endif
	pthread_err = pthread_create(&ic_control_info.threadHandle, &t_atts, rxThreadFunc, NULL);

	pthread_attr_destroy(&t_atts);
	if (pthread_err != 0)
	{
		ic_control_info.threadCreated = false;
		ereport(FATAL, (errcode(ERRCODE_INTERNAL_ERROR),
						errmsg("InitMotionLayerIPC: failed to create thread"),
						errdetail("pthread_create() failed with err %d", pthread_err)));
	}

	ic_control_info.threadCreated = true;
	return;
}

/*
 * CleanupMotionUDP
 * 		Clean up UDP specific stuff such as cursor ic hash table, thread etc.
 */
void
CleanupMotionUDP(void)
{
	elog(DEBUG2, "udp-ic: telling receiver thread to shutdown.");

	/*
	 * We should not hold any lock when we reach here even
	 * when we report FATAL errors. Just in case,
	 * We still release the locks here.
	 */
	pthread_mutex_unlock(&ic_control_info.errorLock);
	pthread_mutex_unlock(&ic_control_info.lock);

	/* Shutdown rx thread. */
	compare_and_swap_32(&ic_control_info.shutdown, 0, 1);

	if(ic_control_info.threadCreated)
	{
		pthread_join(ic_control_info.threadHandle, NULL);
	}

	elog(DEBUG2, "udp-ic: receiver thread shutdown.");

	purgeCursorIcEntry(&rx_control_info.cursorHistoryTable);

	destroyConnHashTable(&ic_control_info.connHtab);

	/* background thread exited, we can do the cleanup without locking. */
	cleanupStartupCache();
	destroyConnHashTable(&ic_control_info.startupCacheHtab);

	/* free the disorder buffer */
	pfree(rx_control_info.disorderBuffer);
	rx_control_info.disorderBuffer = NULL;

	/* free the buffer for acks */
	pfree(snd_control_info.ackBuffer);
	snd_control_info.ackBuffer = NULL;

	MemoryContextDelete(ic_control_info.memContext);

#if defined(__darwin__) && !defined(IC_USE_PTHREAD_SYNCHRONIZATION)
	destroyUDPSignal(&ic_control_info.usig);
#endif

#ifdef USE_ASSERT_CHECKING
	/*
	 * Check malloc times, in Interconnect part, memory are carefully released in tear down
	 * code (even when error occurred). But if a FATAL error is reported, tear down
	 * code will not be executed. Thus, it is still possible the malloc times and free times
	 * do not match when we reach here. The process will die in this case, the mismatch does
	 * not introduce issues.
	 */
	if (icudp_malloc_times != 0)
		elog(LOG, "WARNING: malloc times and free times do not match.");
#endif
}

/*
 * waitOnCondition
 *		Used by sender/receiver to wait some time.
 *
 *	MUST BE CALLED WITH *mutex* HELD!
 */
static bool
waitOnCondition(int timeout_us, pthread_cond_t *cond, pthread_mutex_t *mutex)
{
	struct timespec ts;
	int wait;

	Assert(timeout_us >= 0);
	/*
	 * MPP-9910: pthread_cond_timedwait appears to be broken in OS-X 10.6.x "Snow Leopard"
	 * Let's use a different timewait function that works better on OSX (and is simpler
	 * because it uses relative time)
	 */
#ifdef __darwin__
	ts.tv_sec = 0;
	ts.tv_nsec = 1000 * timeout_us;
#else
	{
		struct timeval tv;

		gettimeofday(&tv, NULL);
		ts.tv_sec = tv.tv_sec;
		/*	leave in ms for this */
		ts.tv_nsec = (tv.tv_usec + timeout_us);
		if (ts.tv_nsec >= 1000000)
		{
			ts.tv_sec++;
			ts.tv_nsec -= 1000000;
		}
		ts.tv_nsec *= 1000; /* convert usec to nsec */
	}
#endif

	if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
	{
		elog(DEBUG5, "waiting (timed) on route %d %s", rx_control_info.mainWaitingState.waitingRoute,
			 (rx_control_info.mainWaitingState.waitingRoute == ANY_ROUTE ? "(any route)" : ""));
	}


	/*
	 * interrupts may occurs when we are waiting. the interrupt handler
	 * only set some flags. Only when interrupt checking function is called,
	 * the interrupts are handled.
	 *
	 * We should pay attention to the fact that in elog/erreport/write_log,
	 * interrupts are checked.
	 *
	 */

#if defined(__darwin__)
#if (defined(IC_USE_PTHREAD_SYNCHRONIZATION))
	wait = pthread_cond_timedwait_relative_np(cond, mutex, &ts);
#else
	wait = udpSignalTimeoutWait(&ic_control_info.usig, cond, mutex, &ts);
#endif
#else
	wait = pthread_cond_timedwait(cond, mutex, &ts);
#endif

	if (wait == ETIMEDOUT)
	{
		/* condition not met */
		return false;
	}

	/* we didn't time out, condition met! */
	return true;
}

/*
 * initConnHashTable
 * 		Initialize a connection hash table.
 */
static bool
initConnHashTable(ConnHashTable *ht, MemoryContext cxt)
{
	int i;

	ht->cxt = cxt;
	ht->size = DEFAULT_CONN_HTAB_SIZE;

	if (ht->cxt)
	{
		ht->table = (struct ConnHtabBin **) palloc0(ht->size * sizeof(struct ConnHtabBin *));
	}
	else
	{
		ht->table = (struct ConnHtabBin **) malloc(ht->size * sizeof(struct ConnHtabBin *));
		if (ht->table == NULL)
			return false;
	}

	for (i = 0; i < ht->size; i++)
		ht->table[i] = NULL;

	return true;
}

/*
 * connAddHash
 * 		Add a connection to the hash table
 *
 * Note: we want to add a connection to the hashtable if it isn't
 * already there ... so we just have to check the pointer values -- no
 * need to use CONN_HASH_MATCH() at all!
 */
static bool
connAddHash(ConnHashTable *ht, MotionConn *conn)
{
	uint32 hashcode;
	struct ConnHtabBin *bin, *newbin;
	MemoryContext old;

	hashcode = CONN_HASH_VALUE(&conn->conn_info) % ht->size;

	/*
	 * check for collision -- if we already have an entry for this
	 * connection, don't add another one.
	 */
	for (bin = ht->table[hashcode]; bin != NULL; bin = bin->next)
	{
		if (bin->conn == conn)
		{
			elog(DEBUG5, "connAddHash(): duplicate ?! node %d route %d", conn->conn_info.motNodeId, conn->route);
			return true; /* false *only* indicates memory-alloc failure. */
		}
	}

	if (ht->cxt)
	{
		old = MemoryContextSwitchTo(ht->cxt);
		newbin = (struct ConnHtabBin *) palloc0(sizeof(struct ConnHtabBin));
	}
	else
	{
		newbin = (struct ConnHtabBin *) malloc(sizeof(struct ConnHtabBin));
		if (newbin == NULL)
			return false;
	}

	newbin->conn = conn;
	newbin->next = ht->table[hashcode];
	ht->table[hashcode] = newbin;

	if (ht->cxt)
		MemoryContextSwitchTo(old);
	return true;
}

/*
 * connDelHash
 * 		Delete a connection from the hash table
 *
 * Note: we want to remove a connection from the hashtable if it is
 * there ... so we just have to check the pointer values -- no need to
 * use CONN_HASH_MATCH() at all!
 */
static void
connDelHash(ConnHashTable *ht, MotionConn *conn)
{
	uint32 hashcode;
	struct ConnHtabBin *c, *p, *trash;

	hashcode = CONN_HASH_VALUE(&conn->conn_info) % ht->size;

	c = ht->table[hashcode];

	/* find entry */
	p = NULL;
	while (c != NULL)
	{
		/* found ? */
		if (c->conn == conn)
			break;

		p = c;
		c = c->next;
	}

	/* not found ? */
	if (c == NULL)
	{
		return;
	}

	/* found the connection, remove from the chain. */
	trash = c;

	if (p == NULL)
		ht->table[hashcode] = c->next;
	else
		p->next = c->next;

	if (ht->cxt)
		pfree(trash);
	else
		free(trash);
	return;
}

/*
 * findConnByHeader
 * 		Find the corresponding connection given a pkt header information.
 *
 * With the new mirroring scheme, the interconnect is no longer involved:
 * we don't have to disambiguate anymore.
 *
 * NOTE: the icpkthdr field dstListenerPort is used for disambiguation.
 * on receivers it may not match the actual port (it may have an extra bit
 * set (1<<31)).
 */
static MotionConn *
findConnByHeader(ConnHashTable *ht, icpkthdr *hdr)
{
	uint32 hashcode;
	struct ConnHtabBin *bin;
	MotionConn *ret = NULL;

	hashcode = CONN_HASH_VALUE(hdr) % ht->size;

	for (bin = ht->table[hashcode]; bin != NULL; bin = bin->next)
	{
		if (CONN_HASH_MATCH(&bin->conn->conn_info, hdr))
		{
			ret = bin->conn;

			if (DEBUG5 >= log_min_messages)
				write_log("findConnByHeader: found. route %d state %d hashcode %d conn %p",
						  ret->route, ret->state, hashcode, ret);

			return ret;
		}
	}

	if (DEBUG5 >= log_min_messages)
		write_log("findConnByHeader: not found! (hdr->srcPid %d "
				  "hdr->srcContentId %d hdr->dstContentId %d hdr->dstPid %d sess(%d:%d) cmd(%d:%d)) hashcode %d",
				  hdr->srcPid, hdr->srcContentId, hdr->dstContentId, hdr->dstPid, hdr->sessionId,
				  gp_session_id, hdr->icId, gp_interconnect_id, hashcode);

	return NULL;
}

/*
 * destroyConnHashTable
 * 		Release the connection hash table.
 */
static void
destroyConnHashTable(ConnHashTable *ht)
{
	int i;

	for (i = 0; i < ht->size; i++)
	{
		struct ConnHtabBin *trash;

		while (ht->table[i] != NULL)
		{
			trash = ht->table[i];
			ht->table[i] = trash->next;

			if (ht->cxt)
				pfree(trash);
			else
				free(trash);
		}
	}

	if (ht->cxt)
		pfree(ht->table);
	else
		free(ht->table);

	ht->table = NULL;
	ht->size = 0;
}

/*
 * sendControlMessage
 * 		Helper function to send a control message.
 *
 * It is different from sendOnce which retries on interrupts...
 * Here, we leave it to retransmit logic to handle these cases.
 */
static inline void
sendControlMessage(icpkthdr *pkt, int fd, struct sockaddr *addr, socklen_t peerLen)
{
	int n;

#ifdef USE_ASSERT_CHECKING
	if (testmode_inject_fault(gp_udpic_dropacks_percent))
	{
	#ifdef AMS_VERBOSE_LOGGING
		write_log("THROW CONTROL MESSAGE with seq %d extraSeq %d srcpid %d despid %d", pkt->seq, pkt->extraSeq, pkt->srcPid, pkt->dstPid);
	#endif
		return;
	}
#endif

	/* Add CRC for the control message. */
	if (gp_interconnect_full_crc)
		addCRC(pkt);

	n = sendto(fd, (const char *)pkt, pkt->len, 0, addr, peerLen);

	/* No need to handle EAGAIN here: no-space just means that we
	 * dropped the packet: our ordinary retransmit mechanism will
	 * handle that case
	 */

	if (n < pkt->len)
		write_log("sendcontrolmessage: got error %d errno %d seq %d", n, errno, pkt->seq);
}

/*
 * setAckSendParam
 * 		Set the ack sending parameters.
 */
static inline void
setAckSendParam(AckSendParam *param, MotionConn *conn, int32 flags, uint32 seq, uint32 extraSeq)
{
	memcpy(&param->msg, (char *)&conn->conn_info, sizeof(icpkthdr));
	param->msg.flags = flags;
	param->msg.seq = seq;
	param->msg.extraSeq = extraSeq;
	param->msg.len = sizeof(icpkthdr);
	param->peer = conn->peer;
	param->peer_len = conn->peer_len;
}

/*
 * sendAckWithParam
 * 		Send acknowledgment to sender.
 */
static inline void
sendAckWithParam(AckSendParam *param)
{
	sendControlMessage(&param->msg, UDP_listenerFd, (struct sockaddr *)&param->peer, param->peer_len);
}

/*
 * sendAck
 * 		Send acknowledgment to sender.
 */
static void
sendAck(MotionConn *conn, int32 flags, uint32 seq, uint32 extraSeq)
{
	icpkthdr msg;

	memcpy(&msg, (char *)&conn->conn_info, sizeof(msg));

	msg.flags = flags;
	msg.seq = seq;
	msg.extraSeq = extraSeq;
	msg.len = sizeof(icpkthdr);

#ifdef AMS_VERBOSE_LOGGING
	write_log("sendack: flags 0x%x node %d route %d seq %d extraSeq %d",
					msg.flags, msg.motNodeId, conn->route, msg.seq, msg.extraSeq);
#endif

	sendControlMessage(&msg, UDP_listenerFd, (struct sockaddr *)&conn->peer, conn->peer_len);

}

/*
 * sendDisorderAck
 *		Send a disorder message to the sender.
 *
 * Whenever the receiver detects a disorder packet, it will assemble a disorder message
 * which contains the sequence numbers of the possibly lost packets.
 *
 */
static void
sendDisorderAck(MotionConn *conn, uint32 seq, uint32 extraSeq, uint32 lostPktCnt)
{
	icpkthdr *disorderBuffer = rx_control_info.disorderBuffer;

	memcpy(disorderBuffer, (char *)&conn->conn_info, sizeof(icpkthdr));

	disorderBuffer->flags |= UDPIC_FLAGS_DISORDER;
	disorderBuffer->seq = seq;
	disorderBuffer->extraSeq = extraSeq;
	disorderBuffer->len = lostPktCnt * sizeof(uint32) + sizeof(icpkthdr);

#ifdef AMS_VERBOSE_LOGGING
	if (!(conn->peer.ss_family == AF_INET || conn->peer.ss_family == AF_INET6))
	{
		write_log("UDP Interconnect bug (in sendDisorderAck): trying to send ack when we don't know where to send to %s", conn->remoteHostAndPort);
	}
#endif

	sendControlMessage(disorderBuffer, UDP_listenerFd, (struct sockaddr *)&conn->peer, conn->peer_len);

}

/*
 * sendStatusQueryMessage
 *		Used by senders to send a status query message for a connection to receivers.
 *
 * When receivers get such a message, they will respond with
 * the connection status (consumed seq, received seq ...).
 */
static void
sendStatusQueryMessage(MotionConn *conn, int fd, uint32 seq)
{
	icpkthdr msg;

	memcpy(&msg, (char *)&conn->conn_info, sizeof(msg));
	msg.flags = UDPIC_FLAGS_CAPACITY;
	msg.seq = seq;
	msg.extraSeq = 0;
	msg.len = sizeof(msg);

#ifdef TRANSFER_PROTOCOL_STATS
	updateStats(TPE_ACK_PKT_QUERY, conn, &msg);
#endif

	sendControlMessage(&msg, fd, (struct sockaddr *)&conn->peer, conn->peer_len);

}

/*
 * putRxBufferAndSendAck
 * 		Return a buffer and send an acknowledgment.
 *
 *  SHOULD BE CALLED WITH rx_control_info.lock *LOCKED*
 */
static void
putRxBufferAndSendAck(MotionConn *conn, AckSendParam *param)
{
	icpkthdr *buf=NULL;

	buf = (icpkthdr *)conn->pkt_q[conn->pkt_q_head];
	uint32 seq = buf->seq;

#ifdef AMS_VERBOSE_LOGGING
	elog(LOG, "putRxBufferAndSendAck conn %p pkt [seq %d] for node %d route %d, [head seq] %d queue size %d, queue head %d queue tail %d", conn, buf->seq, buf->motNodeId, conn->route, conn->conn_info.seq - conn->pkt_q_size, conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail);
#endif

	if (buf == NULL)
	{
		pthread_mutex_unlock(&ic_control_info.lock);
		elog(FATAL, "putRxBufferAndSendAck: buffer is NULL");
	}

	conn->pkt_q[conn->pkt_q_head] = NULL;
	conn->pBuff = NULL;
	conn->pkt_q_head = (conn->pkt_q_head + 1) % Gp_interconnect_queue_depth;
	conn->pkt_q_size--;

#ifdef AMS_VERBOSE_LOGGING
	elog(LOG, "putRxBufferAndSendAck conn %p pkt [seq %d] for node %d route %d, [head seq] %d queue size %d, queue head %d queue tail %d", conn, buf->seq, buf->motNodeId, conn->route, conn->conn_info.seq - conn->pkt_q_size, conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail);
#endif

	putRxBufferToFreeList(&rx_buffer_pool, buf);

	conn->conn_info.extraSeq = seq;

	/* Send an Ack to the sender. */
	if ((seq % 2 == 0) || (Gp_interconnect_queue_depth == 1))
	{
		if (param != NULL)
		{
			setAckSendParam(param, conn, UDPIC_FLAGS_ACK | UDPIC_FLAGS_CAPACITY | conn->conn_info.flags, conn->conn_info.seq - 1, seq);
		}
		else
		{
			sendAck(conn, UDPIC_FLAGS_ACK | UDPIC_FLAGS_CAPACITY | conn->conn_info.flags, conn->conn_info.seq - 1, seq);
		}
	}
}

/*
 * MlPutRxBuffer
 *
 * The cdbmotion code has discarded our pointer to the motion-conn
 * structure, but has enough info to fully specify it.
 */
void
MlPutRxBuffer(ChunkTransportState *transportStates, int motNodeID, int route)
{
	ChunkTransportStateEntry	*pEntry = NULL;
	MotionConn			*conn = NULL;
	AckSendParam param;

	getChunkTransportState(transportStates, motNodeID, &pEntry);

	conn = pEntry->conns + route;

	memset(&param, 0, sizeof(AckSendParam));

	pthread_mutex_lock(&ic_control_info.lock);

	if (conn->pBuff != NULL)
	{
		putRxBufferAndSendAck(conn, &param);
	}
	else
	{
		pthread_mutex_unlock(&ic_control_info.lock);
		elog(FATAL, "Interconnect error: tried to release a NULL buffer");
	}

	pthread_mutex_unlock(&ic_control_info.lock);

	/* real ack sending is after lock release to decrease the lock holding time. */
	if (param.msg.len != 0)
		sendAckWithParam(&param);
}

/*
 * initRxBufferPool
 * 		Initialize receive buffer pool.
 */
static void
initRxBufferPool(RxBufferPool *p)
{
	p->count = 0;
	p->maxCount = 1;
	p->freeList = NULL;
}


/*
 * getRxBuffer
 * 		Get a receive buffer.
 *
 * SHOULD BE CALLED WITH rx_control_info.lock *LOCKED*
 *
 * NOTE: This function MUST NOT contain elog or ereport statements.
 * elog is NOT thread-safe.  Developers should instead use something like:
 *
 *	if (DEBUG3 >= log_min_messages)
 *		write_log("my brilliant log statement here.");
 *
 * NOTE: In threads, we cannot use palloc/pfree, because it's not thread safe.
 */
static icpkthdr *
getRxBuffer(RxBufferPool *p)
{
	icpkthdr *ret = NULL;

#ifdef USE_ASSERT_CHECKING
	if (FINC_HAS_FAULT(FINC_RX_BUF_NULL) &&
		testmode_inject_fault(gp_udpic_fault_inject_percent))
		return NULL;
#endif

	do
	{
		if (p->freeList == NULL)
		{
			if (p->count > p->maxCount)
			{
				if (DEBUG3 >= log_min_messages)
					write_log("Interconnect ran out of rx-buffers count/max %d/%d", p->count, p->maxCount);
				break;
			}

			/* malloc is used for thread safty. */
			ret = (icpkthdr *)malloc(Gp_max_packet_size);

			/*
			 * Note: we return NULL if the malloc() fails -- and the
			 * background thread will set the error. Main thread will
			 * check the error, report it and start teardown.
			 */
			if (ret != NULL)
				p->count++;

			break;
		}

		/* we have buffers available in our freelist */

		ret = getRxBufferFromFreeList(p);

	} while (0);

	return ret;
}

/*
 * putRxBufferToFreeList
 * 		Return a receive buffer to free list
 *
 *  SHOULD BE CALLED WITH rx_control_info.lock *LOCKED*
 */
static inline void
putRxBufferToFreeList(RxBufferPool *p, icpkthdr *buf)
{
	/* return the buffer into the free list. */
	*(char **)buf = p->freeList;
	p->freeList = (char *)buf;
}

/*
 * getRxBufferFromFreeList
 * 		Get a receive buffer from free list
 *
 * SHOULD BE CALLED WITH rx_control_info.lock *LOCKED*
 *
 * NOTE: This function MUST NOT contain elog or ereport statements.
 * elog is NOT thread-safe.  Developers should instead use something like:
 *
 *	if (DEBUG3 >= log_min_messages)
 *		write_log("my brilliant log statement here.");
 *
 * NOTE: In threads, we cannot use palloc/pfree, because it's not thread safe.
 */
static inline icpkthdr*
getRxBufferFromFreeList(RxBufferPool *p)
{
	icpkthdr *buf = NULL;

	buf = (icpkthdr *) p->freeList;
	p->freeList = *(char **) (p->freeList);
	return buf;
}

/*
 * freeRxBuffer
 * 		Free a receive buffer.
 *
 * NOTE: This function MUST NOT contain elog or ereport statements.
 * elog is NOT thread-safe.  Developers should instead use something like:
 *
 *	if (DEBUG3 >= log_min_messages)
 *		write_log("my brilliant log statement here.");
 *
 * NOTE: In threads, we cannot use palloc/pfree, because it's not thread safe.
 */
static inline void
freeRxBuffer(RxBufferPool *p, icpkthdr *buf)
{
	free(buf);
	p->count--;
}

/*
 * setSocketBufferSize
 * 		Set socket buffer size.
 */
static uint32
setSocketBufferSize(int fd, int type, int expectedSize, int leastSize)
{
	int			bufSize;
	int			errnoSave;
	socklen_t	skLen=0;
	const char  *fun;

	fun = "getsockopt";
	skLen = sizeof(bufSize);
	if (getsockopt(fd, SOL_SOCKET, type, (char *)&bufSize, &skLen) < 0)
		goto error;

	elog(DEBUG1, "UDP-IC: xmit default buffer size %d bytes", bufSize);

	/*
	 * We'll try the expected size first, and fall back to least size if that doesn't work.
	 */

	bufSize = expectedSize;
	fun = "setsockopt";
	while (setsockopt(fd, SOL_SOCKET, type, (const char *)&bufSize, skLen) < 0)
	{
		bufSize = bufSize >> 1;
		if (bufSize < leastSize)
			goto error;
	}

	elog(DEBUG1, "UDP-IC: xmit use buffer size %d bytes", bufSize);

	return bufSize;

error:
	errnoSave = errno;
	if (fd >= 0)
		closesocket(fd);
	errno = errnoSave;
	ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
					errmsg("Interconnect Error: Could not set up udp listener socket."),
					errdetail("%m%s", fun)));
	/* Make GCC not complain. */
	return 0;
}

/*
 * setXmitSocketOptions
 * 		Set transmit socket options.
 */
static void
setXmitSocketOptions(int txfd)
{
	uint32 bufSize = 0;

	/*
	 * The Gp_udp_bufsize_k guc should be set carefully.
	 *
	 * If it is small, such as 128K, and send queue depth and receive queue depth are large,
	 * then it is possible OS can not handle all of the UDP packets GPDB delivered to it.
	 * OS will introduce a lot of packet losses and disordered packets.
	 *
	 * In order to set Gp_udp_bufsize_k to a larger value, the OS UDP buffer should be set to
	 * a large enough value.
	 *
	 */
	bufSize = (Gp_udp_bufsize_k != 0 ? Gp_udp_bufsize_k * 1024 : 2048 * 1024);

	ic_control_info.socketRecvBufferSize = setSocketBufferSize(txfd, SO_RCVBUF, bufSize, 128 * 1024);
	ic_control_info.socketSendBufferSize = setSocketBufferSize(txfd, SO_SNDBUF, bufSize, 128 * 1024);

}

#ifdef USE_ASSERT_CHECKING

/*
 * icBufferListLog
 * 		Log the buffer list.
 */
static void
icBufferListLog(ICBufferList *list)
{
	write_log("Length %d, type %d headptr %p", list->length, list->type, &list->head);

	ICBufferLink *bufLink = list->head.next;

	int len = list->length;
	int i = 0;

	while (bufLink != &list->head && len > 0)
	{
		ICBuffer *buf = (list->type == ICBufferListType_Primary ? GET_ICBUFFER_FROM_PRIMARY(bufLink)
				: GET_ICBUFFER_FROM_SECONDARY(bufLink));
		write_log("Node %d, linkptr %p", i++, bufLink);
		logPkt("from list", buf->pkt);
		bufLink = bufLink->next;
		len--;
	}
}

/*
 * icBufferListCheck
 * 		Buffer list sanity check.
 */
static void
icBufferListCheck(char * prefix, ICBufferList *list)
{
	if (list == NULL)
	{
		write_log("ICBufferList ERROR %s: NULL list", prefix);
		goto error;
	}
	if (list->length < 0)
	{
		write_log("ICBufferList ERROR %s: list length %d < 0 ", prefix, list->length);
		goto error;
	}

	if (list->length == 0 && (list->head.prev != list->head.next && list->head.prev != &list->head))
	{
		write_log("ICBufferList ERROR %s: length is 0, &list->head %p, prev %p, next %p", prefix, &list->head, list->head.prev, list->head.next);
		icBufferListLog(list);
		goto error;
	}

	int len = list->length;

	ICBufferLink *link = list->head.next;
	while (len > 0)
	{
		link = link->next;
		len--;
	}

	if (link != &list->head)
	{
		write_log("ICBufferList ERROR: %s len %d", prefix, list->length);
		icBufferListLog(list);
		goto error;
	}

	return;

error:
	write_log("wait for 120s and then abort.");
	pg_usleep(120000000);
	abort();
}
#endif

/*
 * icBufferListInitHeadLink
 * 		Initialize the pointers in the head link to point to itself.
 */
static inline void
icBufferListInitHeadLink(ICBufferLink *link)
{
	link->next = link->prev = link;
}

/*
 * icBufferListInit
 * 		Initialize the buffer list with the given type.
 */
static inline void
icBufferListInit(ICBufferList *list, ICBufferListType type)
{
	list->type = type;
	list->length = 0;

	icBufferListInitHeadLink(&list->head);

#ifdef USE_ASSERT_CHECKING
	icBufferListCheck("icBufferListInit", list);
#endif
}

/*
 * icBufferListIsHead
 * 		Return whether the given link is the head link of the list.
 *
 * 	This function is often used as the end condition of an iteration of the list.
 */
static inline bool
icBufferListIsHead(ICBufferList *list, ICBufferLink *link)
{
#ifdef USE_ASSERT_CHECKING
	icBufferListCheck("icBufferListIsHead", list);
#endif
	return (link == &list->head);
}

/*
 * icBufferListFirst
 * 		Return the first link after the head link.
 *
 * 	Note that the head link is a pseudo link used to only to ease the operations of the link list.
 * 	If the list only contains the head link, this function will return the head link.
 */
static inline ICBufferLink *
icBufferListFirst(ICBufferList *list)
{
#ifdef USE_ASSERT_CHECKING
	icBufferListCheck("icBufferListFirst", list);
#endif
	return list->head.next;
}

/*
 * icBufferListLength
 * 		Get the list length.
 */
static inline int
icBufferListLength(ICBufferList *list)
{
#ifdef USE_ASSERT_CHECKING
	icBufferListCheck("icBufferListLength", list);
#endif
	return list->length;
}

/*
 * icBufferListDelete
 *		Remove an buffer from the buffer list and return the buffer.
 */
static inline ICBuffer *
icBufferListDelete(ICBufferList *list, ICBuffer *buf)
{
#ifdef USE_ASSERT_CHECKING
	icBufferListCheck("icBufferListDelete", list);
#endif

	ICBufferLink *bufLink = NULL;

	bufLink = (list->type == ICBufferListType_Primary ? &buf->primary : &buf->secondary);

	bufLink->prev->next = bufLink->next;
	bufLink->next->prev = bufLink->prev;

	list->length--;

	return buf;
}

/*
 * icBufferListPop
 * 		Remove the head buffer from the list.
 */
static inline ICBuffer *
icBufferListPop(ICBufferList *list)
{
	ICBuffer *buf = NULL;
	ICBufferLink *bufLink = NULL;

#ifdef USE_ASSERT_CHECKING
	icBufferListCheck("icBufferListPop", list);
#endif

	if (list->length == 0)
		return NULL;

	bufLink = icBufferListFirst(list);
	buf = (list->type == ICBufferListType_Primary ? GET_ICBUFFER_FROM_PRIMARY(bufLink)
					: GET_ICBUFFER_FROM_SECONDARY(bufLink));

	bufLink->prev->next = bufLink->next;
	bufLink->next->prev = bufLink->prev;

	list->length--;

	return buf;
}

/*
 * icBufferListFree
 * 		Free all the buffers in the list.
 */
static void
icBufferListFree(ICBufferList *list)
{
	ICBuffer *buf = NULL;

#ifdef USE_ASSERT_CHECKING
	icBufferListCheck("icBufferListFree", list);
#endif

	while ((buf = icBufferListPop(list)) != NULL)
		pfree(buf);

}

/*
 * icBufferListAppend
 * 		Append a buffer to a list.
 */
static inline ICBuffer *
icBufferListAppend(ICBufferList *list, ICBuffer *buf)
{
#ifdef USE_ASSERT_CHECKING
	icBufferListCheck("icBufferListAppend", list);
#endif

	ICBufferLink *bufLink = NULL;

	bufLink = (list->type == ICBufferListType_Primary ? &buf->primary : &buf->secondary);

	bufLink->prev = list->head.prev;
	bufLink->next = &list->head;

	list->head.prev->next = bufLink;
	list->head.prev = bufLink;

	list->length++;

	return buf;
}

/*
 * icBufferListReturn
 * 		Return the buffers in the list to the free buffer list.
 *
 * If the buf is also in an expiration queue, we also need to remove it from the expiration queue.
 *
 */
static void
icBufferListReturn(ICBufferList *list, bool inExpirationQueue)
{
#ifdef USE_ASSERT_CHECKING
	icBufferListCheck("icBufferListReturn", list);
#endif
	ICBuffer *buf = NULL;

	while ((buf = icBufferListPop(list)) != NULL )
	{
		if (inExpirationQueue) /* the buf is in also in the expiration queue */
		{
			icBufferListDelete(&unack_queue_ring.slots[buf->unackQueueRingSlot], buf);
			unack_queue_ring.numOutStanding--;
			if (icBufferListLength(list) >= 1)
				unack_queue_ring.numSharedOutStanding--;
		}

		icBufferListAppend(&snd_buffer_pool.freeList, buf);
	}
}

/*
 * initUnackQueueRing
 *		Initialize an unack queue ring.
 *
 *	Align current time to a slot boundary and set current slot index (time pointer) to 0.
 */
static void
initUnackQueueRing(UnackQueueRing *uqr)
{
	int i = 0;

	uqr->currentTime = getCurrentTime();
	uqr->currentTime = uqr->currentTime - (uqr->currentTime % TIMER_SPAN);
	uqr->idx = 0;
	uqr->numOutStanding = 0;
	uqr->numSharedOutStanding = 0;

	for(; i < UNACK_QUEUE_RING_SLOTS_NUM; i++)
	{
		icBufferListInit(&uqr->slots[i], ICBufferListType_Secondary);
	}
}

/*
 * computeExpirationPeriod
 * 		Compute expiration period according to the connection information.
 *
 * Considerations on expiration period computation:
 *
 * RTT is dynamically computed, and expiration period is based on RTT values.
 * We cannot simply use RTT as the expiration value, since real workload does
 * not always have a stable RTT. A small constant value is multiplied to the RTT value
 * to make the resending logic insensitive to the frequent small changes of RTT.
 *
 */
static inline uint64
computeExpirationPeriod(MotionConn *conn, uint32 retry)
{
	/*
	 * In fault injection mode, we often use DEFAULT_RTT,
	 * because the intentional large percent of packet/ack losses will make
	 * the RTT too large. This will leads to a slow retransmit speed.
	 * In real hardware environment/workload, we do not expect such a packet loss pattern.
	 */
#ifdef USE_ASSERT_CHECKING
	if (udp_testmode)
	{
		return DEFAULT_RTT;
	}
	else
#endif
	{
		uint32 factor = (retry <= 12 ? retry : 12);
		return Max(MIN_EXPIRATION_PERIOD, Min(MAX_EXPIRATION_PERIOD, (conn->rtt + (conn->dev << 2)) << (factor)));
    }
}

/*
 * initSndBufferPool
 * 		Initialize the send buffer pool.
 *
 * The initial maxCount is set to 1 for gp_interconnect_snd_queue_depth = 1 case,
 * then there is at least an extra free buffer to send for that case.
 */
static void
initSndBufferPool(SendBufferPool *p)
{
	icBufferListInit(&p->freeList, ICBufferListType_Primary);
	p->count = 0;
	p->maxCount = (Gp_interconnect_snd_queue_depth == 1 ? 1 : 0);
}

/*
 * cleanSndBufferPool
 * 		Clean the send buffer pool.
 */
static inline void
cleanSndBufferPool(SendBufferPool *p)
{
    icBufferListFree(&p->freeList);
	p->count = 0;
	p->maxCount = 0;
}

/*
 * getSndBuffer
 * 		Get a send buffer for a connection.
 *
 *  Different flow control mechanisms use different buffer management policies.
 *  Capacity based flow control uses per-connection buffer policy and Loss based
 *  flow control uses shared buffer policy.
 *
 * 	Return NULL when no free buffer available.
 */
static ICBuffer *
getSndBuffer(MotionConn *conn)
{
	ICBuffer *ret = NULL;

	ic_statistics.totalBuffers += (icBufferListLength(&snd_buffer_pool.freeList) + snd_buffer_pool.maxCount - snd_buffer_pool.count);
	ic_statistics.bufferCountingTime++;

	/* Capacity based flow control does not use shared buffers */
	if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_CAPACITY)
	{
		Assert(icBufferListLength(&conn->unackQueue) + icBufferListLength(&conn->sndQueue) <= Gp_interconnect_snd_queue_depth);
		if (icBufferListLength(&conn->unackQueue) + icBufferListLength(&conn->sndQueue) >= Gp_interconnect_snd_queue_depth)
			return NULL;
	}

	if (icBufferListLength(&snd_buffer_pool.freeList) > 0)
	{
		return icBufferListPop(&snd_buffer_pool.freeList);
	}
	else
	{
		if (snd_buffer_pool.count < snd_buffer_pool.maxCount)
		{
			ret = (ICBuffer *) palloc0(Gp_max_packet_size + sizeof(ICBuffer));
			snd_buffer_pool.count++;
			ret->conn = NULL;
			ret->nRetry = 0;
			icBufferListInitHeadLink(&ret->primary);
			icBufferListInitHeadLink(&ret->secondary);
			ret->unackQueueRingSlot = 0;
		}
		else
		{
			return NULL;
		}
	}

	return ret;
}

/*
 *  The udp interconnect specific implementation of timeout wait/signal mechanism.
 *  (Only for MacOS)
 *
 * The introduction of this is due to the bug in pthread_cond_wait/pthread_cond_timedwait_relative_np
 * on MacOs. (MPP-9910).
 *
 * The implementation of the signal mechanism is based on UDP protocol. Waiting thread is polling on
 * a UDP socket, and wakes up thread will send a signal id to the socket when the condition is met.
 *
 * Due to the reliability of UDP protocol (packet loss, duplicate, interrupted system calls, error
 * return of system calls...), the waiting thread should:
 *
 * 1) check the condition again when it is waken up
 * 2) when the time wait return with timeout, it should check the condition again.
 *
 * It is not necessary to implement a retransmit/ack mechanism because the local socket is relatively
 * reliable and the communication load is light.
 *
 * ##NOTE: This implementation is specific for UDP interconnect use, and it is not portable. Developers
 * should pay attention to add another condition variable.
 */

#if defined(__darwin__) && !defined(IC_USE_PTHREAD_SYNCHRONIZATION)
/*
 * udpSignalTimeoutWait
 * 		Timeout wait implementation on Mac.
 *
 * Return 0 on success (condition met) and return ETIMEOUT on timeout/other errors.
 *
 * Note that this implementation is UDP interconnect specific and not for general usage.
 * It depends on the udp socket built in InitMotionUDP.
 *
 * Can only be used in Main thread.
 *
 */
static int
udpSignalTimeoutWait(UDPSignal *sig, pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *ts)
{
	Assert(sig != NULL && mutex != NULL && ts != NULL);

	int timeout = ts->tv_nsec/1000/1000;

	Assert(timeout >= 0);

	pthread_mutex_unlock(mutex);

	sig->sigId = (void *)cond;
	bool ret = ETIMEDOUT;
	if (udpSignalPoll(sig, timeout))
	{
		if (udpSignalGet(sig))
			ret = 0;
	}
	sig->sigId = NULL;
	pthread_mutex_lock(mutex);
	return ret;
}

/*
 * udpSignal
 *
 * The udp interconnect specific implementation of pthread_cond_signal.
 *
 */
static void
udpSignal(UDPSignal *sig)
{
	int n;
	char buf[16];

#ifdef USE_ASSERT_CHECKING
	int percent = gp_udpic_dropacks_percent > 0 ? 1 : 0;
	if (testmode_inject_fault(percent))
	{
	#ifdef AMS_VERBOSE_LOGGING
		write_log("THROW SIGNAL with value %p", sig->sigId);
	#endif
		return;
	}
#endif

xmit_retry:

	*((void **)buf) = sig->sigId;
	n = sendto(sig->fd, buf, sizeof(sig->sigId), 0,
			   (struct sockaddr *)&sig->peer, sig->peer_len);

	if (n < 0)
	{
		if (errno == EINTR)
			goto xmit_retry;

		if (errno == EAGAIN) /* no space ? not an error. */
			return;

		/* Error case, this may happen in both main thread and background thread,
		 * treat it like in background thread. Finally, main thread will find this error.
		 */
		write_log("udpsignal failed fd %d: got error %d errno %d signal %p", sig->fd, n, errno, sig->sigId);
		setRxThreadError(errno);
		return;
		/* not reached */
	}

	if (n != sizeof(int))
		write_log("udpsignal failed fd %d: got error %d errno %d signal %p", sig->fd, n, errno, sig->sigId);

}

/*
 * setupUDPSignal
 * 		Setup the socket needed by the signal.
 *
 * Can only be used in Main thread.
 */
static void
setupUDPSignal(UDPSignal *sig)
{
	Assert(sig != NULL);

	uint16 port;
	setupUDPListeningSocket(&sig->fd, &port, &sig->txFamily);
	sig->port = port;
	getSockAddr(&sig->peer, &sig->peer_len, "127.0.0.1", port);
	sig->sigId = NULL;
	Assert(sig->peer.ss_family == AF_INET || sig->peer.ss_family == AF_INET6);
}

/*
 * destroyUDPSignal
 * 		Destroy the signal.
 *
 * Can only be used in Main thread.
 */
static void
destroyUDPSignal(UDPSignal *sig)
{
	Assert(sig != NULL);

	if (sig->fd >= 0)
		closesocket(sig->fd);
	sig->sigId = NULL;
	sig->fd = -1;
	sig->port = 0;
}

/*
 * udpSignalGet
 * 		Try to get the signal from the socket.
 *
 * Note: Can only be called from main thread.
 */
static bool
udpSignalGet(UDPSignal *sig)
{
	int n;

#define SIGNAL_BUFFER_SIZE 16
	struct sockaddr_storage peer;
	socklen_t peerlen;
	char buf[SIGNAL_BUFFER_SIZE];

	for (;;)
	{
		/* ready to read on our socket ? */
		peerlen = sizeof(peer);
		n = recvfrom(sig->fd, buf, SIGNAL_BUFFER_SIZE, 0, (struct sockaddr *)&peer, &peerlen);

		if (n < 0)
		{
			/* had nothing to read. */
			if (errno == EWOULDBLOCK)
				return false;

			if (errno == EINTR)
				continue;

			elog(ERROR, "Interconnect error: getting signal from socket buffer failed.");

			/* not reached. */
			return false;
		}

		if (n != sizeof(sig->sigId))
		{
			continue;
		}

		if (*((void **)buf) == sig->sigId)
		{
		#ifdef AMS_VERBOSE_LOGGING
			write_log("Get signal %p", *((void **)buf));
		#endif
			return true;
		}
	}
	return false;
}

/*
 * udpSignalPoll
 * 		Timeout (in ms) polling of signal packets.
 *
 *
 * Note: Can only be called from main thread.
 */
static bool
udpSignalPoll(UDPSignal *sig, int timeout)
{
	struct pollfd nfd;
	int n;

	nfd.fd = sig->fd;
	nfd.events = POLLIN;

	n = poll(&nfd, 1, timeout);

	if (n < 0)
	{
		if (errno == EINTR)
			return false;

		elog(ERROR, "Interconnect error: signal polling failed.");
		/* not reached */
	}

	/* timeout */
	if (n == 0)
	{
		return false;
	}

	/* got some signal in the buffer. */
	if (n == 1 && (nfd.events & POLLIN))
	{
		return true;
	}

	return false;
}
#endif


/*
 * startOutgoingUDPConnections
 * 		Used to initially kick-off any outgoing connections for mySlice.
 *
 * This should not be called for root slices (i.e. QD ones) since they don't
 * ever have outgoing connections.
 *
 * PARAMETERS
 *
 *	 sendSlice	- Slice that this process is a member of.
 *
 * RETURNS
 *	 Initialized ChunkTransportState for the Sending Motion Node Id.
 */
ChunkTransportStateEntry *
startOutgoingUDPConnections(ChunkTransportState *transportStates,
							Slice		*sendSlice,
							int			*pOutgoingCount)
{
	ChunkTransportStateEntry *pEntry;
	MotionConn *conn;
	ListCell   *cell;
	Slice	   *recvSlice;
	CdbProcess *cdbProc;
	int					i;
	uint16 port = 0;

	*pOutgoingCount = 0;

	recvSlice = (Slice *) list_nth(transportStates->sliceTable->slices, sendSlice->parentIndex);

	/*
	 * Potentially introduce a Bug (MPP-17186).
	 * The workaround is to turn off log_hostname guc.
	 */
	adjustMasterRouting(recvSlice);

	if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
		elog(DEBUG1, "Interconnect seg%d slice%d setting up sending motion node",
			 GetQEIndex(), sendSlice->sliceIndex);

	pEntry = createChunkTransportState(transportStates,
									   sendSlice,
									   recvSlice,
									   list_length(recvSlice->primaryProcesses));

	Assert(pEntry && pEntry->valid);
	/*
	 * Setup a MotionConn entry for each of our outbound connections.
	 * Request a connection to each receiving backend's listening port.
	 * NB: Some mirrors could be down & have no CdbProcess entry.
	 */
	conn = pEntry->conns;

	i = 0;
	foreach(cell, recvSlice->primaryProcesses)
	{
		cdbProc = (CdbProcess *)lfirst(cell);
		if (cdbProc)
		{
			conn->cdbProc = cdbProc;
			icBufferListInit(&conn->sndQueue, ICBufferListType_Primary);
			icBufferListInit(&conn->unackQueue, ICBufferListType_Primary);
			conn->capacity = Gp_interconnect_queue_depth;

			/* send buffer pool must be initialized before this. */
			snd_buffer_pool.maxCount += Gp_interconnect_snd_queue_depth;
			snd_control_info.cwnd += 1;
			conn->curBuff = getSndBuffer(conn);

			/* should have at least one buffer for each connection */
			Assert(conn->curBuff != NULL);

			conn->rtt = DEFAULT_RTT;
			conn->dev = DEFAULT_DEV;
			conn->deadlockCheckBeginTime = 0;
			conn->tupleCount = 0;
			conn->msgSize = sizeof(conn->conn_info);
			conn->sentSeq = 0;
			conn->receivedAckSeq = 0;
			conn->consumedSeq = 0;
			conn->pBuff = (uint8 *)conn->curBuff->pkt;
			conn->state = mcsSetupOutgoingConnection;
			conn->route = i++;

			conn->waitEOS = false;

			(*pOutgoingCount)++;
		}

		conn++;
	}

	pEntry->txfd = -1;
	pEntry->txport = 0;
	setupUDPListeningSocket(&pEntry->txfd, &port, &pEntry->txfd_family);
	pEntry->txport = port;

	return pEntry;

}


/*
 * getSockAddr
 * 		Convert IP addr and port to sockaddr
 */
static void
getSockAddr(struct sockaddr_storage *peer, socklen_t *peer_len, const char *listenerAddr, int listenerPort)
{
	int			ret;
	char		portNumberStr[32];
	char	   *service;
	struct addrinfo *addrs = NULL;
	struct addrinfo hint;

	/*
	 * Get socketaddr to connect to.
	 */

	/* Initialize hint structure */
	MemSet(&hint, 0, sizeof(hint));
	hint.ai_socktype = SOCK_DGRAM; /* UDP */
	hint.ai_family = AF_UNSPEC; /* Allow for any family (v4, v6, even unix in the future)  */
#ifdef AI_NUMERICSERV
	hint.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV;  /* Never do name resolution */
#else
	hint.ai_flags = AI_NUMERICHOST;  /* Never do name resolution */
#endif

	snprintf(portNumberStr, sizeof(portNumberStr), "%d", listenerPort);
	service = portNumberStr;

	ret = pg_getaddrinfo_all(listenerAddr, service, &hint, &addrs);
	if (ret || !addrs)
	{
		if (addrs)
			pg_freeaddrinfo_all(hint.ai_family, addrs);

		ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
			errmsg("Interconnect Error: Could not parse remote listener"
				   "address: '%s' port '%d': %s", listenerAddr,listenerPort,gai_strerror(ret)),
			errdetail("getaddrinfo() unable to parse address: '%s'",
					  listenerAddr)));
		return;
	}
	/* Since we aren't using name resolution, getaddrinfo will return only 1 entry */

	elog(DEBUG1,"GetSockAddr socket ai_family %d ai_socktype %d ai_protocol %d for %s ",addrs->ai_family, addrs->ai_socktype, addrs->ai_protocol, listenerAddr);
	memset(peer, 0, sizeof(struct sockaddr_storage));
	memcpy(peer, addrs->ai_addr, addrs->ai_addrlen);
	*peer_len = addrs->ai_addrlen;

	pg_freeaddrinfo_all(addrs->ai_family, addrs);
}

/*
 * setupOutgoingUDPConnection
 *		Setup outgoing UDP connection.
 */
void
setupOutgoingUDPConnection(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn)
{
	CdbProcess		   *cdbProc = conn->cdbProc;

	Assert(conn->state == mcsSetupOutgoingConnection);
	Assert(conn->cdbProc);

	conn->wakeup_ms = 0;
	conn->remoteContentId = cdbProc->contentid;
	conn->stat_min_ack_time = ~((uint64)0);

	/* Save the information for the error message if getaddrinfo fails */
	if (strchr(cdbProc->listenerAddr,':') != 0)
		snprintf(conn->remoteHostAndPort, sizeof(conn->remoteHostAndPort),
					 "[%s]:%d", cdbProc->listenerAddr, cdbProc->listenerPort);
	else
		snprintf(conn->remoteHostAndPort, sizeof(conn->remoteHostAndPort),
			 "%s:%d", cdbProc->listenerAddr, cdbProc->listenerPort);

	/*
	 * Get socketaddr to connect to.
	 */
	getSockAddr(&conn->peer, &conn->peer_len, cdbProc->listenerAddr, cdbProc->listenerPort);

	/* Save the destination IP address */
	formatSockAddr((struct sockaddr *)&conn->peer, conn->remoteHostAndPort,
					sizeof(conn->remoteHostAndPort));

	Assert(conn->peer.ss_family == AF_INET || conn->peer.ss_family == AF_INET6 );

	{
#ifdef USE_ASSERT_CHECKING
		{
			struct sockaddr_storage source_addr;
			socklen_t source_addr_len;
			MemSet(&source_addr, 0, sizeof(source_addr));
			source_addr_len = sizeof(source_addr);

			if (getsockname(pEntry->txfd, (struct sockaddr *) &source_addr, &source_addr_len) == -1)
			{
				ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
							errmsg("Interconnect Error: Could not get port from socket."),
							errdetail("%m")));
			}
			Assert(pEntry->txfd_family == source_addr.ss_family);
		}
#endif
		/*
		 * If the socket was created with a different address family than the place we
		 * are sending to, we might need to do something special.
		 */
		if (pEntry->txfd_family != conn->peer.ss_family)
		{
			/*
			 * If the socket was created AF_INET6, but the address we want to send to is IPv4 (AF_INET),
			 * we might need to change the address format.  On Linux, it isn't necessary:  glibc automatically
			 * handles this.  But on MAC OSX and Solaris, we need to convert the IPv4 address to an
			 * V4-MAPPED address in AF_INET6 format.
			 */
			if (pEntry->txfd_family == AF_INET6)
			{
				struct sockaddr_storage temp;
				const struct sockaddr_in *in = (const struct sockaddr_in *)&conn->peer;
				struct sockaddr_in6 *in6_new = (struct sockaddr_in6 *)&temp;
				memset(&temp, 0, sizeof(temp));

				elog(DEBUG1, "We are inet6, remote is inet.  Converting to v4 mapped address.");

				/* Construct a V4-to-6 mapped address.  */
				temp.ss_family = AF_INET6;
				in6_new->sin6_family = AF_INET6;
				in6_new->sin6_port = in->sin_port;
				in6_new->sin6_flowinfo = 0;

				memset (&in6_new->sin6_addr, '\0', sizeof (in6_new->sin6_addr));
				//in6_new->sin6_addr.s6_addr16[5] = 0xffff;
				((uint16 *)&in6_new->sin6_addr)[5] = 0xffff;
				//in6_new->sin6_addr.s6_addr32[3] = in->sin_addr.s_addr;
				memcpy(((char *)&in6_new->sin6_addr)+12,&(in->sin_addr),4);
				in6_new->sin6_scope_id = 0;

				/* copy it back */
				memcpy(&conn->peer,&temp,sizeof(struct sockaddr_in6));
				conn->peer_len = sizeof(struct sockaddr_in6);
			}
			else
			{
				/*
				 * If we get here, something is really wrong.  We created the socket as IPv4-only (AF_INET),
				 * but the address we are trying to send to is IPv6.  It's possible we could have a V4-mapped
				 * address that we could convert to an IPv4 address, but there is currently no code path where
				 * that could happen.  So this must be an error.
				 */
				elog(ERROR, "Trying to use an IPv4 (AF_INET) socket to send to an IPv6 address");
			}
		}
	}

	if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
		ereport(DEBUG1, (errmsg("Interconnect connecting to seg%d slice%d %s "
								"pid=%d sockfd=%d",
								conn->remoteContentId,
								pEntry->recvSlice->sliceIndex,
								conn->remoteHostAndPort,
								conn->cdbProc->pid,
								conn->sockfd)));

	/* send connection request */
	MemSet(&conn->conn_info, 0, sizeof(conn->conn_info));
	conn->conn_info.len = 0;
	conn->conn_info.flags = 0;
	conn->conn_info.motNodeId = pEntry->motNodeId;

	conn->conn_info.recvSliceIndex = pEntry->recvSlice->sliceIndex;
	conn->conn_info.sendSliceIndex = pEntry->sendSlice->sliceIndex;
	conn->conn_info.srcContentId = GetQEIndex();
	conn->conn_info.dstContentId = conn->cdbProc->contentid;

	if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
		elog(DEBUG1, "setupOutgoingUDPConnection: node %d route %d srccontent %d dstcontent %d: %s",
			 pEntry->motNodeId, conn->route, GetQEIndex(), conn->cdbProc->contentid, conn->remoteHostAndPort);

	conn->conn_info.srcListenerPort = (Gp_listener_port>>16) & 0x0ffff;
	conn->conn_info.srcPid = MyProcPid;
	conn->conn_info.dstPid = conn->cdbProc->pid;
	conn->conn_info.dstListenerPort = conn->cdbProc->listenerPort;

	conn->conn_info.sessionId = gp_session_id;
	conn->conn_info.icId = gp_interconnect_id;

	connAddHash(&ic_control_info.connHtab, conn);

	/*
	 * No need to get the connection lock here, since background rx thread will never access send connections.
	 */
	conn->msgPos = NULL;
	conn->msgSize = sizeof(conn->conn_info);
	conn->stillActive = true;
	conn->conn_info.seq = 1;
	Assert(conn->peer.ss_family == AF_INET || conn->peer.ss_family == AF_INET6 );

}								/* setupOutgoingUDPConnection */

/*
 * checkForCancelFromQD
 * 		Check for cancel from QD.
 *
 * Should be called only inside the dispatcher
 */
static void checkForCancelFromQD(ChunkTransportState *pTransportStates) {
  Assert(Gp_role == GP_ROLE_DISPATCH);
  Assert(pTransportStates);
  Assert(pTransportStates->estate);

  if (pTransportStates->estate->dispatch_data &&
      dispatcher_has_error(pTransportStates->estate->dispatch_data)) {
    ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
                    errmsg(CDB_MOTION_LOST_CONTACT_STRING)));
  }

  if (pTransportStates->estate->mainDispatchData &&
      mainDispatchHasError(pTransportStates->estate->mainDispatchData)) {
    ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
                    errmsg(CDB_MOTION_LOST_CONTACT_STRING)));
  }
}

/*
 * handleCachedPackets
 * 		Deal with cached packets.
 */
static void
handleCachedPackets(void)
{
	MotionConn *cachedConn = NULL;
	MotionConn *setupConn = NULL;
	ConnHtabBin *bin = NULL;
	icpkthdr *pkt = NULL;
	AckSendParam param;
	int	i = 0;
	int j = 0;

	for (i = 0; i < ic_control_info.startupCacheHtab.size; i++)
	{
		bin = ic_control_info.startupCacheHtab.table[i];

		while (bin)
		{
			cachedConn = bin->conn,
			setupConn = NULL;

			for (j = 0; j < cachedConn->pkt_q_size; j++)
			{
				pkt = (icpkthdr *) cachedConn->pkt_q[j];

				if (pkt == NULL)
					continue;

				rx_buffer_pool.maxCount--;

				/* look up this pkt's connection in connHtab */
				setupConn = findConnByHeader(&ic_control_info.connHtab, pkt);
				if (setupConn == NULL)
				{
					/* mismatch! */
					putRxBufferToFreeList(&rx_buffer_pool, pkt);
					cachedConn->pkt_q[j] = NULL;
					continue;
				}

				memset(&param, 0, sizeof(param));
				if (!handleDataPacket(setupConn, pkt, &cachedConn->peer, &cachedConn->peer_len, &param))
				{
					/* no need to cache this packet */
					putRxBufferToFreeList(&rx_buffer_pool, pkt);
				}

				ic_statistics.recvPktNum++;
				if (param.msg.len != 0)
					sendAckWithParam(&param);

				cachedConn->pkt_q[j] = NULL;
			}
			bin = bin->next;
			connDelHash(&ic_control_info.startupCacheHtab, cachedConn);

			/* MPP-19981
			 * free the cached connections; otherwise memory leak
			 * would be introduced.
			 */
			free(cachedConn->pkt_q);
			free(cachedConn);
		}
	}
}

/*
 * SetupUDPInterconnect_Internal
 * 		Internal function for setting up UDP interconnect.
 */
static void
SetupUDPInterconnect_Internal(EState *estate)
{
	int			i, n;
	ListCell   *cell;
	Slice	   *mySlice;
	Slice	   *aSlice;
	MotionConn *conn=NULL;
	int			incoming_count = 0;
	int			outgoing_count = 0;
	int			expectedTotalIncoming = 0;
	int			expectedTotalOutgoing = 0;

	ChunkTransportStateEntry *sendingChunkTransportState = NULL;

	pthread_mutex_lock(&ic_control_info.lock);

	gp_interconnect_id = estate->es_sliceTable->ic_instance_id;

	Assert(gp_interconnect_id > 0);

	estate->interconnect_context = palloc0(sizeof(ChunkTransportState));

	/* add back-pointer for dispatch check. */
	estate->interconnect_context->estate = estate;

	/* initialize state variables */
	Assert(estate->interconnect_context->size == 0);
	estate->interconnect_context->size = CTS_INITIAL_SIZE;
	estate->interconnect_context->states = palloc0(CTS_INITIAL_SIZE * sizeof(ChunkTransportStateEntry));

	estate->interconnect_context->teardownActive = false;
	estate->interconnect_context->activated = false;
	estate->interconnect_context->incompleteConns = NIL;
	estate->interconnect_context->sliceTable = NULL;
	estate->interconnect_context->sliceId = -1;

	estate->interconnect_context->sliceTable = estate->es_sliceTable;

	estate->interconnect_context->sliceId = LocallyExecutingSliceIndex(estate);

	estate->interconnect_context->RecvTupleChunkFrom = RecvTupleChunkFromUDP;
	estate->interconnect_context->RecvTupleChunkFromAny = RecvTupleChunkFromAnyUDP;
	estate->interconnect_context->SendEos = SendEosUDP;
	estate->interconnect_context->SendChunk = SendChunkUDP;
	estate->interconnect_context->doSendStopMessage = doSendStopMessageUDP;

	mySlice = (Slice *) list_nth(estate->interconnect_context->sliceTable->slices, LocallyExecutingSliceIndex(estate));

	Assert(mySlice &&
		   IsA(mySlice, Slice) &&
		   mySlice->sliceIndex == LocallyExecutingSliceIndex(estate));

#ifdef USE_ASSERT_CHECKING
	if (gp_udpic_dropseg != UNDEF_SEGMENT
			|| gp_udpic_dropacks_percent != 0
			|| gp_udpic_dropxmit_percent != 0
			|| gp_udpic_fault_inject_percent != 0)
		udp_testmode = true;
	else
		udp_testmode = false;
#endif

	if (Gp_role == GP_ROLE_DISPATCH)
	{
		DistributedTransactionId distTransId = 0;
		TransactionId localTransId = 0;
		TransactionId subtransId = 0;

	    GetAllTransactionXids(&(distTransId),
	                          &(localTransId),
	                          &(subtransId));

	    /*
	     * Prune only when we are not in the save transaction and there is a large number
	     * of entries in the table
	     */
	    if (distTransId != rx_control_info.lastDXatId && rx_control_info.cursorHistoryTable.count > (2 * CURSOR_IC_TABLE_SIZE))
	    {
	    	if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
	    		elog(DEBUG1, "prune cursor history table (count %d), icid %d", rx_control_info.cursorHistoryTable.count, gp_interconnect_id);
	    	pruneCursorIcEntry(&rx_control_info.cursorHistoryTable, gp_interconnect_id);
	    }

		addCursorIcEntry(&rx_control_info.cursorHistoryTable, gp_interconnect_id, gp_command_count);

		/* save the latest transaction id. */
		rx_control_info.lastDXatId = distTransId;
	}

	/* now we'll do some setup for each of our Receiving Motion Nodes. */
	foreach(cell, mySlice->children)
	{
		int			numProcs;
		int			childId = lfirst_int(cell);
		ChunkTransportStateEntry *pEntry=NULL;
		int numValidProcs = 0;

		aSlice = (Slice *) list_nth(estate->interconnect_context->sliceTable->slices, childId);
		numProcs = list_length(aSlice->primaryProcesses);

    	if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
    		elog(DEBUG1, "Setup recving connections: my slice %d, childId %d",
    			 mySlice->sliceIndex, childId);

		pEntry = createChunkTransportState(estate->interconnect_context, aSlice, mySlice, numProcs);

		Assert(pEntry);
		Assert(pEntry->valid);

		for (i=0; i < pEntry->numConns; i++)
		{
			conn = &pEntry->conns[i];
			conn->cdbProc = list_nth(aSlice->primaryProcesses, i);

			if (conn->cdbProc)
			{
				numValidProcs++;

				/* update the max buffer count of our rx buffer pool.  */
				rx_buffer_pool.maxCount += Gp_interconnect_queue_depth;

				/* rx_buffer_queue */
				conn->pkt_q_size = 0;
				conn->pkt_q_head = 0;
				conn->pkt_q_tail = 0;
				conn->pkt_q = (uint8 **) palloc0(Gp_interconnect_queue_depth * sizeof(uint8 *));

				/* connection header info (defining characteristics of this connection) */
				MemSet(&conn->conn_info, 0, sizeof(conn->conn_info));
				conn->route = i;

				conn->conn_info.seq = 1;
				conn->stillActive = true;

				incoming_count++;

				conn->conn_info.motNodeId = pEntry->motNodeId;
				conn->conn_info.recvSliceIndex = mySlice->sliceIndex;
				conn->conn_info.sendSliceIndex = aSlice->sliceIndex;

				conn->conn_info.srcContentId = conn->cdbProc->contentid;
				conn->conn_info.dstContentId = GetQEIndex();

				conn->conn_info.srcListenerPort = conn->cdbProc->listenerPort;
				conn->conn_info.srcPid = conn->cdbProc->pid;
				conn->conn_info.dstPid = MyProcPid;
				conn->conn_info.dstListenerPort = (Gp_listener_port>>16) & 0x0ffff;
				conn->conn_info.sessionId = gp_session_id;
				conn->conn_info.icId = gp_interconnect_id;
				conn->conn_info.flags = UDPIC_FLAGS_RECEIVER_TO_SENDER;

				connAddHash(&ic_control_info.connHtab, conn);
			}
		}

		expectedTotalIncoming += numValidProcs;

		/* let cdbmotion know how many receivers to expect. */
		setExpectedReceivers(estate->motionlayer_context, childId, numValidProcs);
	}

	snd_control_info.cwnd = 0;
	snd_control_info.minCwnd = 0;
	snd_control_info.ssthresh = 0;

	/* Initiate outgoing connections. */
	if (mySlice->parentIndex != -1)
	{
		initSndBufferPool(&snd_buffer_pool);
		initUnackQueueRing(&unack_queue_ring);
		ic_control_info.isSender = true;
		ic_control_info.lastExpirationCheckTime = getCurrentTime();
		ic_control_info.lastPacketSendTime = ic_control_info.lastExpirationCheckTime;
		ic_control_info.lastDeadlockCheckTime = ic_control_info.lastExpirationCheckTime;

		sendingChunkTransportState = startOutgoingUDPConnections(estate->interconnect_context, mySlice, &expectedTotalOutgoing);
		n = sendingChunkTransportState->numConns;

		for (i = 0; i < n; i++)
		{						/* loop to set up outgoing connections */
			conn = &sendingChunkTransportState->conns[i];

			setupOutgoingUDPConnection(estate->interconnect_context, sendingChunkTransportState, conn);
			outgoing_count++;
			continue;
		}
		snd_control_info.minCwnd = snd_control_info.cwnd;
		snd_control_info.ssthresh = snd_buffer_pool.maxCount;

	#ifdef TRANSFER_PROTOCOL_STATS
		initTransProtoStats();
	#endif

	}
	else
	{
		ic_control_info.isSender = false;
		ic_control_info.lastExpirationCheckTime = 0;
	}

	if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
		ereport(DEBUG1, (errmsg("SetupUDPInterconnect will activate "
								"%d incoming, %d outgoing routes for gp_interconnect_id %d. "
								"Listening on ports=%d/%d sockfd=%d.",
								expectedTotalIncoming, expectedTotalOutgoing, gp_interconnect_id,
								Gp_listener_port&0x0ffff, (Gp_listener_port>>16)&0x0ffff, UDP_listenerFd)));

	/* If there are packets cached by background thread, add them to the connections. */
	if (gp_interconnect_cache_future_packets)
		handleCachedPackets();

	estate->interconnect_context->activated = true;

	pthread_mutex_unlock(&ic_control_info.lock);
}

/*
 * SetupUDPInterconnect
 * 		setup UDP interconnect.
 */
void
SetupUDPInterconnect(EState *estate)
{
	if (estate->interconnect_context)
	{
		elog(FATAL, "SetupUDPInterconnect: already initialized.");
	}
	else if (!estate->es_sliceTable)
	{
		elog(FATAL, "SetupUDPInterconnect: no slice table ?");
	}

	PG_TRY();
	{
		/*
		 * The rx-thread might have set an error since last teardown,
		 * technically it is not part of current query, discard it directly.
		 */
		resetRxThreadError();

		SetupUDPInterconnect_Internal(estate);

        /* Internal error if we locked the mutex but forgot to unlock it. */
        Assert(pthread_mutex_unlock(&ic_control_info.lock) != 0);
	}
	PG_CATCH();
	{
		pthread_mutex_unlock(&ic_control_info.lock);
		PG_RE_THROW();
	}
	PG_END_TRY();
}


/*
 * freeDisorderedPackets
 * 		Put the disordered packets into free buffer list.
 */
static void
freeDisorderedPackets(MotionConn *conn)
{
	int k = 0;

	if (conn->pkt_q == NULL)
		return;

	for(; k < Gp_interconnect_queue_depth; k++)
	{
		icpkthdr *buf = (icpkthdr *)conn->pkt_q[k];
		if (buf != NULL)
		{
			if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
				elog(DEBUG1, "CLEAR Out-of-order PKT: conn %p pkt [seq %d] for node %d route %d, [head seq] %d queue size %d, queue head %d queue tail %d", conn, buf->seq, buf->motNodeId, conn->route, conn->conn_info.seq - conn->pkt_q_size, conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail);

			/* return the buffer into the free list. */
			putRxBufferToFreeList(&rx_buffer_pool, buf);
			conn->pkt_q[k] = NULL;
		}
	}
}

/*
 * chunkTransportStateEntryInitialized
 *  	Check whether the transport state entry is initialized.
 */
static bool
chunkTransportStateEntryInitialized(ChunkTransportState *transportStates,
						  int16 motNodeID)
{
	if (motNodeID > transportStates->size || !transportStates->states[motNodeID - 1].valid)
		return false;

	return true;
}

/*
 * computeNetworkStatistics
 * 		Compute the max/min/avg network statistics.
 */
static inline void
computeNetworkStatistics(uint64 value, uint64 *min, uint64 *max, double *sum)
{
	if (value >= *max)
		*max = value;
	if (value <= *min)
		*min = value;
	*sum += value;
}

/*
 * TeardownUDPInterconnect_Internal
 * 		Helper function for TeardownUDPInterconnect.
 *
 * Developers should pay attention to:
 *
 * 1) Do not handle interrupts/throw errors in Teardown, otherwise, Teardown may be called twice.
 *    It will introduce an undefined behavior. And memory leaks will be introduced.
 *
 * 2) Be careful about adding elog/ereport/write_log in Teardown function,
 *    esp, out of HOLD_INTERRUPTS/RESUME_INTERRUPTS pair, since elog/ereport/write_log may
 *    handle interrupts.
 *
 */
static void
TeardownUDPInterconnect_Internal(ChunkTransportState *transportStates,
						MotionLayerState *mlStates,
						bool forceEOS)
{
	ChunkTransportStateEntry *pEntry = NULL;
	int			i;
	Slice	   *mySlice;
	MotionConn *conn;

	uint64 maxRtt = 0;
	double avgRtt = 0;
	uint64 minRtt = ~((uint64)0);

	uint64 maxDev = 0;
	double avgDev = 0;
	uint64 minDev = ~((uint64)0);

	bool   isReceiver = false;

	if (transportStates == NULL || transportStates->sliceTable == NULL)
	{
		elog(LOG, "TeardownUDPInterconnect: missing slice table.");
		return;
	}

	if (!transportStates->states)
	{
		elog(LOG, "TeardownUDPInterconnect: missing states.");
		return;
	}

	mySlice = (Slice *) list_nth(transportStates->sliceTable->slices, transportStates->sliceId);

	HOLD_INTERRUPTS();

	/* Log the start of TeardownInterconnect. */
	if (gp_log_interconnect >= GPVARS_VERBOSITY_TERSE)
	{
		int		elevel = 0;

		if (forceEOS || !transportStates->activated)
		{
			if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
				elevel = LOG;
			else
				elevel = DEBUG1;
		}
		else if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
			elevel = DEBUG4;

		if (elevel)
			ereport(elevel, (errmsg("Interconnect seg%d slice%d cleanup state: "
								 "%s; setup was %s",
								 GetQEIndex(), mySlice->sliceIndex,
								 forceEOS ? "force" : "normal",
								 transportStates->activated ? "completed" : "exited")));

		/* if setup did not complete, log the slicetable */
		if (!transportStates->activated &&
			gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
			elog_node_display(DEBUG3, "local slice table", transportStates->sliceTable, true);
	}

	/*
	 * add lock to protect the hash table, since background thread is still working.
	 */
    pthread_mutex_lock(&ic_control_info.lock);

    if (gp_interconnect_cache_future_packets)
    	cleanupStartupCache();

    /*
     * Now "normal" connections which made it through our
     * peer-registration step. With these we have to worry about
     * "in-flight" data.
     */
    if (mySlice->parentIndex != -1)
    {
    	Slice	   *parentSlice;

    	parentSlice = (Slice *) list_nth(transportStates->sliceTable->slices, mySlice->parentIndex);

    	/* cleanup a Sending motion node. */
    	if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
    			elog(DEBUG1, "Interconnect seg%d slice%d closing connections to slice%d (%d peers)",
    					GetQEIndex(), mySlice->sliceIndex, mySlice->parentIndex,
    					list_length(parentSlice->primaryProcesses));

    	/*
    	 * In the olden days, we required that the error case
    	 * successfully transmit and end-of-stream message here. But
    	 * the introduction of cdbdisp_check_estate_for_cancel()
    	 * alleviates for the QD case, and the cross-connection of
    	 * writer gangs in the dispatcher (propagation of cancel
    	 * between them) fixes the I-S case.
    	 *
    	 * So the call to forceEosToPeers() is no longer required.
    	 */
    	if (chunkTransportStateEntryInitialized(transportStates, mySlice->sliceIndex))
    	{
    		/* now it is safe to remove. */
    		pEntry = removeChunkTransportState(transportStates, mySlice->sliceIndex);

    		if (pEntry->txfd >= 0)
    			closesocket(pEntry->txfd);
    		pEntry->txfd = -1;
    		pEntry->txfd_family = 0;

    		/* connection array allocation may fail in interconnect setup. */
    		if (pEntry->conns)
    		{
				for (i = 0; i < pEntry->numConns; i++)
				{
					conn = pEntry->conns + i;
					if (conn->cdbProc == NULL)
						continue;

					/* compute some statistics */
					computeNetworkStatistics(conn->rtt, &minRtt, &maxRtt, &avgRtt);
					computeNetworkStatistics(conn->dev, &minDev, &maxDev, &avgDev);

					icBufferListReturn(&conn->sndQueue, false);
					icBufferListReturn(&conn->unackQueue, Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_CAPACITY ? false : true);

					connDelHash(&ic_control_info.connHtab, conn);
				}
				avgRtt = avgRtt / pEntry->numConns;
				avgDev = avgDev / pEntry->numConns;

				/* free all send side buffers */
				cleanSndBufferPool(&snd_buffer_pool);
    		}
    	}
#ifdef TRANSFER_PROTOCOL_STATS
		dumpTransProtoStats();
#endif

	}

	/* Previously, there is a piece of code that deals with pending stops.
	 * Now it is delegated to background rx thread which will deal with any
	 * mismatched packets.
	 */

	/*
	 * cleanup all of our Receiving Motion nodes, these get closed
	 * immediately (the receiver know for real if they want to shut
	 * down -- they aren't going to be processing any more data).
	 */
	ListCell   *cell;
	foreach(cell, mySlice->children)
	{
		Slice	*aSlice;
		int		childId = lfirst_int(cell);

		aSlice = (Slice *) list_nth(transportStates->sliceTable->slices, childId);

		/*
		 * First check whether the entry is initialized to avoid the potential
		 * errors thrown out from the removeChunkTransportState, which may
		 * introduce some memory leaks.
		 */
    	if (chunkTransportStateEntryInitialized(transportStates, aSlice->sliceIndex))
    	{
			/* remove it */
			pEntry = removeChunkTransportState(transportStates, aSlice->sliceIndex);
			Assert(pEntry);

			if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
				elog(DEBUG1, "Interconnect closing connections from slice%d",
					 aSlice->sliceIndex);
			isReceiver = true;

			if (pEntry->conns)
			{
				/*
				 * receivers know that they no longer care about data from
				 * below ... so we can safely discard data queued in both
				 * directions
				 */
				for (i = 0; i < pEntry->numConns; i++)
				{
					conn = pEntry->conns + i;
					if (conn->cdbProc == NULL)
						continue;

					rx_buffer_pool.maxCount -= Gp_interconnect_queue_depth;

					/* out of memory has occurred, break out */
					if (!conn->pkt_q)
						break;

					connDelHash(&ic_control_info.connHtab, conn);

					/* putRxBufferAndSendAck() dequeues messages and moves them to pBuff */
					while (conn->pkt_q_size > 0)
					{
						putRxBufferAndSendAck(conn, NULL);
					}

					/* we also need to clear all the out-of-order packets */
					freeDisorderedPackets(conn);

					/* free up the packet queue */
					pfree(conn->pkt_q);
					conn->pkt_q = NULL;
				}
				pfree(pEntry->conns);
				pEntry->conns = NULL;
			}
    	}
	}

	/* now that we've moved active rx-buffers to the freelist, we can prune the freelist itself */
	while (rx_buffer_pool.count > rx_buffer_pool.maxCount)
	{
		icpkthdr *buf = NULL;

		/* If this happened, there is some memory leaks.. */
		if (rx_buffer_pool.freeList == NULL)
		{
			pthread_mutex_unlock(&ic_control_info.lock);
			elog(FATAL, "freelist NULL: count %d max %d buf %p", rx_buffer_pool.count, rx_buffer_pool.maxCount, rx_buffer_pool.freeList);
		}

		buf = getRxBufferFromFreeList(&rx_buffer_pool);
		freeRxBuffer(&rx_buffer_pool, buf);
	}

	/*
	 * Update the history of interconnect instance id.
	 */
	if (Gp_role == GP_ROLE_DISPATCH)
	{
		updateCursorIcEntry(&rx_control_info.cursorHistoryTable, transportStates->sliceTable->ic_instance_id, 0);
	}
	else if (Gp_role == GP_ROLE_EXECUTE)
	{
		rx_control_info.lastTornIcId = transportStates->sliceTable->ic_instance_id;
	}

	elog((gp_interconnect_log_stats ? LOG: DEBUG1), "Interconnect State: "
			"isSender %d isReceiver %d "
			"snd_queue_depth %d recv_queue_depth %d Gp_max_packet_size %d "
			"UNACK_QUEUE_RING_SLOTS_NUM %d TIMER_SPAN %d DEFAULT_RTT %d "
			"forceEOS %d, gp_interconnect_id %d ic_id_last_teardown %d "
			"snd_buffer_pool.count %d snd_buffer_pool.maxCount %d snd_sock_bufsize %d recv_sock_bufsize %d "
			"snd_pkt_count %d retransmits %d crc_errors %d"
			" recv_pkt_count %d recv_ack_num %d"
			" recv_queue_size_avg %f"
			" capacity_avg %f"
			" freebuf_avg %f "
			"mismatch_pkt_num %d disordered_pkt_num %d duplicated_pkt_num %d"
			" rtt/dev [" UINT64_FORMAT "/" UINT64_FORMAT ", %f/%f, " UINT64_FORMAT "/" UINT64_FORMAT "] "
			" cwnd %f status_query_msg_num %d",
			ic_control_info.isSender, isReceiver,
			Gp_interconnect_snd_queue_depth, Gp_interconnect_queue_depth, Gp_max_packet_size,
			UNACK_QUEUE_RING_SLOTS_NUM, TIMER_SPAN, DEFAULT_RTT,
			forceEOS, transportStates->sliceTable->ic_instance_id, rx_control_info.lastTornIcId,
			snd_buffer_pool.count, snd_buffer_pool.maxCount, ic_control_info.socketSendBufferSize, ic_control_info.socketRecvBufferSize,
			ic_statistics.sndPktNum, ic_statistics.retransmits, ic_statistics.crcErrors,
			ic_statistics.recvPktNum, ic_statistics.recvAckNum,
			(double)((double)ic_statistics.totalRecvQueueSize)/((double)ic_statistics.recvQueueSizeCountingTime),
			(double)((double)ic_statistics.totalCapacity)/((double)ic_statistics.capacityCountingTime),
			(double)((double)ic_statistics.totalBuffers)/((double)ic_statistics.bufferCountingTime),
			ic_statistics.mismatchNum, ic_statistics.disorderedPktNum, ic_statistics.duplicatedPktNum,
			(minRtt == ~((uint64)0) ? 0 : minRtt), (minDev == ~((uint64)0) ? 0 : minDev), avgRtt, avgDev, maxRtt, maxDev,
			snd_control_info.cwnd, ic_statistics.statusQueryMsgNum);

	ic_control_info.isSender = false;
	memset(&ic_statistics, 0, sizeof(ICStatistics));

	pthread_mutex_unlock(&ic_control_info.lock);

	/* reset the rx thread network error flag */
	resetRxThreadError();

	transportStates->activated = false;
	transportStates->sliceTable = NULL;

	if (transportStates != NULL)
	{
		if (transportStates->states != NULL)
		{
			pfree(transportStates->states);
			transportStates->states = NULL;
		}
		pfree(transportStates);
	}

	if (gp_log_interconnect >= GPVARS_VERBOSITY_TERSE)
		elog(DEBUG1, "TeardownUDPInterconnect successful");

	RESUME_INTERRUPTS();
}
/*
 * TeardownUDPInterconnect
 * 		Tear down UDP interconnect.
 *
 * This function is called to release the resources used by interconnect.
 */
void
TeardownUDPInterconnect(ChunkTransportState *transportStates,
						MotionLayerState *mlStates,
						bool forceEOS)
{
	PG_TRY();
	{
		TeardownUDPInterconnect_Internal(transportStates, mlStates, forceEOS);

		Assert(pthread_mutex_unlock(&ic_control_info.errorLock) != 0);
		Assert(pthread_mutex_unlock(&ic_control_info.lock) != 0);
	}
	PG_CATCH();
	{
		pthread_mutex_unlock(&ic_control_info.errorLock);
		pthread_mutex_unlock(&ic_control_info.lock);
		PG_RE_THROW();
	}
	PG_END_TRY();
}

/*
 * prepareRxConnForRead
 * 		Prepare the receive connection for reading.
 *
 * MUST BE CALLED WITH rx_control_info.lock LOCKED.
 */
static void
prepareRxConnForRead(MotionConn *conn)
{

	elog(DEBUG3, "In prepareRxConnForRead: conn %p, q_head %d q_tail %d q_size %d", conn, conn->pkt_q_head, conn->pkt_q_tail, conn->pkt_q_size);

	Assert(conn->pkt_q[conn->pkt_q_head] != NULL);
	conn->pBuff = conn->pkt_q[conn->pkt_q_head];
	conn->msgPos = conn->pBuff;
	conn->msgSize = ((icpkthdr *)conn->pBuff)->len;
	conn->recvBytes = conn->msgSize;
}

/*
 * receiveChunksUDP
 * 		Receive chunks from the senders
 *
 * MUST BE CALLED WITH rx_control_info.lock LOCKED.
 */
static TupleChunkListItem
receiveChunksUDP(ChunkTransportState *pTransportStates, ChunkTransportStateEntry *pEntry,
				 int16 motNodeID, int16 *srcRoute, MotionConn *conn, bool inTeardown)
{
	int			retries = 0;
	bool		directed = false;
	MotionConn *rxconn = NULL;
	TupleChunkListItem	tcItem=NULL;

#ifdef AMS_VERBOSE_LOGGING
	elog(DEBUG5, "receivechunksUDP: motnodeid %d", motNodeID);
#endif

	Assert(pTransportStates);
	Assert(pTransportStates->sliceTable);

	if (conn != NULL)
	{
		directed = true;
		*srcRoute = conn->route;
		setMainThreadWaiting(&rx_control_info.mainWaitingState, motNodeID, conn->route,
								pTransportStates->sliceTable->ic_instance_id);
	}
	else
	{
		/* non-directed receive */
		setMainThreadWaiting(&rx_control_info.mainWaitingState, motNodeID, ANY_ROUTE,
								pTransportStates->sliceTable->ic_instance_id);
	}

	/* we didn't have any data, so we've got to read it from the network. */
	for (;;)
	{
		/* 1. Do we have data ready */
		if (rx_control_info.mainWaitingState.reachRoute != ANY_ROUTE)
		{
			rxconn = pEntry->conns + rx_control_info.mainWaitingState.reachRoute;

			prepareRxConnForRead(rxconn);

			elog(DEBUG2, "receiveChunksUDP: non-directed rx woke on route %d", rx_control_info.mainWaitingState.reachRoute);
			resetMainThreadWaiting(&rx_control_info.mainWaitingState);
		}

		aggregateStatistics(pEntry);

		if (rxconn != NULL)
		{
			Assert(rxconn->pBuff);

			pthread_mutex_unlock(&ic_control_info.lock);

			elog(DEBUG2, "got data with length %d", rxconn->recvBytes);
			/* successfully read into this connection's buffer. */
			tcItem = RecvTupleChunk(rxconn, inTeardown);

			if (!directed)
				*srcRoute = rxconn->route;

			return tcItem;
		}

		retries++;

		/* 2. Wait for data to become ready */
		if (waitOnCondition(MAIN_THREAD_COND_TIMEOUT, &ic_control_info.cond, &ic_control_info.lock))
		{
			continue; /* success ! */
		}

		/* handle timeout, check for cancel */
		pthread_mutex_unlock(&ic_control_info.lock);

		/* check the potential errors in rx thread. */
		checkRxThreadError();

		/* do not check interrupts when holding the lock */
		ML_CHECK_FOR_INTERRUPTS(inTeardown);

		/* check to see if the dispatcher should cancel */
		if (Gp_role == GP_ROLE_DISPATCH)
		{
			checkForCancelFromQD(pTransportStates);
		}

		/* NIC on master (and thus the QD connection) may become bad, check it. */
		if ((retries & 0x3f) == 0)
			checkQDConnectionAlive();

		pthread_mutex_lock(&ic_control_info.lock);

	} /* for (;;) */

	/* We either got data, or get cancelled. We never make it out to
	 * here. */
	return NULL; /* make GCC behave */
}

/*
 * RecvTupleChunkFromAnyUDP_Internal
 * 		Receive tuple chunks from any route (connections)
 */
static inline TupleChunkListItem
RecvTupleChunkFromAnyUDP_Internal(MotionLayerState *mlStates,
						 ChunkTransportState *transportStates,
						 int16 motNodeID,
						 int16 *srcRoute)
{
	ChunkTransportStateEntry	*pEntry = NULL;
	MotionConn			*conn=NULL;
	int					i, index, activeCount=0;
	TupleChunkListItem	tcItem=NULL;
	bool				found = false;

	if (!transportStates)
	{
		elog(FATAL, "RecvTupleChunkFromAnyUDP: missing context");
	}
	else if (!transportStates->activated)
	{
		elog(FATAL, "RecvTupleChunkFromAnyUDP: interconnect context not active!");
	}

	getChunkTransportState(transportStates, motNodeID, &pEntry);

	index = pEntry->scanStart;

	pthread_mutex_lock(&ic_control_info.lock);

	for (i = 0; i < pEntry->numConns; i++, index++)
	{
		if (index >= pEntry->numConns)
			index = 0;

		conn = pEntry->conns + index;

		if (conn->stillActive)
			activeCount++;

		ic_statistics.totalRecvQueueSize += conn->pkt_q_size;
		ic_statistics.recvQueueSizeCountingTime++;

		if (conn->pkt_q_size > 0)
		{
			found = true;
			prepareRxConnForRead(conn);
			break;
		}
	}

	if (found)
	{
		pthread_mutex_unlock(&ic_control_info.lock);

		tcItem = RecvTupleChunk(conn, transportStates->teardownActive);
		*srcRoute = conn->route;
		pEntry->scanStart = index + 1;
		return tcItem;
	}

	/* no data pending in our queue */

#ifdef AMS_VERBOSE_LOGGING
	elog(LOG, "RecvTupleChunkFromAnyUDP(): activeCount is %d", activeCount);
#endif
	if (activeCount == 0)
	{
		pthread_mutex_unlock(&ic_control_info.lock);
		return NULL;
	}

	/* receiveChunksUDP() releases rx_control_info.lock as a side-effect */
	tcItem = receiveChunksUDP(transportStates, pEntry, motNodeID, srcRoute, NULL, transportStates->teardownActive);

	pEntry->scanStart = *srcRoute + 1;

	return tcItem;
}

/*
 * RecvTupleChunkFromAnyUDP
 * 		Receive tuple chunks from any route (connections)
 */
static TupleChunkListItem
RecvTupleChunkFromAnyUDP(MotionLayerState *mlStates,
						 ChunkTransportState *transportStates,
						 int16 motNodeID,
						 int16 *srcRoute)
{
	TupleChunkListItem icItem = NULL;

	PG_TRY();
	{
		icItem = RecvTupleChunkFromAnyUDP_Internal(mlStates, transportStates, motNodeID, srcRoute);

		/* error if mutex still held (debug build only) */
        Assert(pthread_mutex_unlock(&ic_control_info.errorLock) != 0);
		Assert(pthread_mutex_unlock(&ic_control_info.lock) != 0);
	}
	PG_CATCH();
	{
        pthread_mutex_unlock(&ic_control_info.errorLock);
		pthread_mutex_unlock(&ic_control_info.lock);
		PG_RE_THROW();
	}
	PG_END_TRY();

	return icItem;
}

/*
 * RecvTupleChunkFromUDP_Internal
 * 		Receive tuple chunks from a specific route (connection)
 */
static inline TupleChunkListItem
RecvTupleChunkFromUDP_Internal(ChunkTransportState *transportStates,
					  int16		motNodeID,
					  int16		srcRoute)
{
	ChunkTransportStateEntry	*pEntry = NULL;
	MotionConn			*conn=NULL;
	int16				route;

	if (!transportStates)
	{
		elog(FATAL, "RecvTupleChunkFromUDP: missing context");
	}
	else if (!transportStates->activated)
	{
		elog(FATAL, "RecvTupleChunkFromUDP: interconnect context not active!");
	}

#ifdef AMS_VERBOSE_LOGGING
	elog(LOG, "RecvTupleChunkFromUDP().");
#endif

	/* check em' */
	ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);

#ifdef AMS_VERBOSE_LOGGING
	elog(DEBUG5, "RecvTupleChunkFromUDP(motNodID=%d, srcRoute=%d)", motNodeID, srcRoute);
#endif

	getChunkTransportState(transportStates, motNodeID, &pEntry);
	conn = pEntry->conns + srcRoute;

#ifdef AMS_VERBOSE_LOGGING
	if (!conn->stillActive)
	{
		elog(LOG, "RecvTupleChunkFromUDP(): connection inactive ?!");
	}
#endif

	pthread_mutex_lock(&ic_control_info.lock);

	if (!conn->stillActive)
	{
		pthread_mutex_unlock(&ic_control_info.lock);
		return NULL;
	}

	ic_statistics.totalRecvQueueSize += conn->pkt_q_size;
	ic_statistics.recvQueueSizeCountingTime++;

	if (conn->pkt_q[conn->pkt_q_head] != NULL)
	{
		prepareRxConnForRead(conn);

		pthread_mutex_unlock(&ic_control_info.lock);

		TupleChunkListItem	tcItem=NULL;

		tcItem = RecvTupleChunk(conn, transportStates->teardownActive);

		return tcItem;
	}

	/* no existing data, we've got to read a packet */
	/* receiveChunksUDP() releases ic_control_info.lock as a side-effect */

	TupleChunkListItem chunks = receiveChunksUDP(transportStates, pEntry, motNodeID, &route, conn, transportStates->teardownActive);

	return chunks;
}

/*
 * RecvTupleChunkFromUDP
 * 		Receive tuple chunks from a specific route (connection)
 */
static TupleChunkListItem
RecvTupleChunkFromUDP(ChunkTransportState *transportStates,
					  int16		motNodeID,
					  int16		srcRoute)
{
	TupleChunkListItem icItem = NULL;

	PG_TRY();
	{
		icItem = RecvTupleChunkFromUDP_Internal(transportStates, motNodeID, srcRoute);

		/* error if mutex still held (debug build only) */
        Assert(pthread_mutex_unlock(&ic_control_info.errorLock) != 0);
		Assert(pthread_mutex_unlock(&ic_control_info.lock) != 0);
	}
	PG_CATCH();
	{
        pthread_mutex_unlock(&ic_control_info.errorLock);
		pthread_mutex_unlock(&ic_control_info.lock);
		PG_RE_THROW();
	}
	PG_END_TRY();

	return icItem;
}

/*
 * markUDPConnInactive
 * 		Mark the connection inactive.
 */
void
markUDPConnInactive(MotionConn *conn)
{
	pthread_mutex_lock(&ic_control_info.lock);
	conn->stillActive = false;
	pthread_mutex_unlock(&ic_control_info.lock);

	return;
}

/*
 * aggregateStatistics
 * 		aggregate statistics.
 */
static void
aggregateStatistics(ChunkTransportStateEntry *pEntry)
{
	/*
	 * We first clear the stats, and then compute new stats
	 * by aggregating the stats from each connection.
	 */
	pEntry->stat_total_ack_time = 0;
	pEntry->stat_count_acks = 0;
	pEntry->stat_max_ack_time = 0;
	pEntry->stat_min_ack_time = ~((uint64)0);
	pEntry->stat_count_resent = 0;
	pEntry->stat_max_resent = 0;
	pEntry->stat_count_dropped = 0;

	int connNo;
	for (connNo = 0; connNo < pEntry->numConns; connNo++)
	{
		MotionConn *conn = &pEntry->conns[connNo];

		pEntry->stat_total_ack_time += conn->stat_total_ack_time;
		pEntry->stat_count_acks += conn->stat_count_acks;
		pEntry->stat_max_ack_time = Max(pEntry->stat_max_ack_time, conn->stat_max_ack_time);
		pEntry->stat_min_ack_time = Min(pEntry->stat_min_ack_time, conn->stat_min_ack_time);
		pEntry->stat_count_resent += conn->stat_count_resent;
		pEntry->stat_max_resent = Max(pEntry->stat_max_resent, conn->stat_max_resent);
		pEntry->stat_count_dropped += conn->stat_count_dropped;
	}
}

/*
 * logPkt
 * 		Log a packet.
 *
 */
static inline void
logPkt(char *prefix, icpkthdr *pkt)
{
	write_log("%s [%s: seq %d extraSeq %d]: motNodeId %d, crc %d len %d "
			"srcContentId %d dstDesContentId %d "
			"srcPid %d dstPid %d "
			"srcListenerPort %d dstListernerPort %d "
			"sendSliceIndex %d recvSliceIndex %d "
			"sessionId %d icId %d "
			"flags %d ",
			prefix, pkt->flags & UDPIC_FLAGS_RECEIVER_TO_SENDER ? "ACK" : "DATA",
			pkt->seq, pkt->extraSeq, pkt->motNodeId, pkt->crc, pkt->len,
			pkt->srcContentId, pkt->dstContentId,
			pkt->srcPid, pkt->dstPid,
			pkt->srcListenerPort, pkt->dstListenerPort,
			pkt->sendSliceIndex, pkt->recvSliceIndex,
			pkt->sessionId, pkt->icId,
			pkt->flags);
}

/*
 * handleAckedPacket
 * 		Called by sender to process acked packet.
 *
 * 	Remove it from unack queue and unack queue ring, change the rtt ...
 */
static void inline
handleAckedPacket(MotionConn *ackConn, ICBuffer *buf, uint64 now)
{
	uint64 ackTime = 0;

	bool bufIsHead = (&buf->primary == icBufferListFirst(&ackConn->unackQueue));

	buf = icBufferListDelete(&ackConn->unackQueue, buf);

	if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS)
	{
		buf = icBufferListDelete(&unack_queue_ring.slots[buf->unackQueueRingSlot], buf);
		unack_queue_ring.numOutStanding--;
		if (icBufferListLength(&ackConn->unackQueue) >= 1)
			unack_queue_ring.numSharedOutStanding--;

		ackTime = now - buf->sentTime;

		/* In udp_testmode, we do not change rtt dynamically due to the
		 * large number of packet losses introduced by fault injection code.
		 * This can decrease the testing time.
		 */
#ifdef USE_ASSERT_CHECKING
		if (!udp_testmode)
#endif
		{
	        uint64 newRTT = 0;
	        uint64 newDEV = 0;

	        if (buf->nRetry == 0)
	        {
	            /* newRTT = buf->conn->rtt * (1 - RTT_COEFFICIENT) + ackTime * RTT_COEFFICIENT; */
	        	newRTT = buf->conn->rtt - (buf->conn->rtt >> RTT_SHIFT_COEFFICIENT) + (ackTime >> RTT_SHIFT_COEFFICIENT);
	        	newRTT = Min(MAX_RTT, Max(newRTT, MIN_RTT));
	        	buf->conn->rtt = newRTT;

	        	/* newDEV = buf->conn->dev * (1 - DEV_COEFFICIENT) + DEV_COEFFICIENT * abs(ackTime - newRTT); */
	        	newDEV = buf->conn->dev - (buf->conn->dev >> DEV_SHIFT_COEFFICIENT) + (abs(ackTime - newRTT) >> DEV_SHIFT_COEFFICIENT);
	        	newDEV = Min(MAX_DEV, Max(newDEV, MIN_DEV));
	        	buf->conn->dev = newDEV;

	        	/* adjust the conjestion control window. */
	        	if (snd_control_info.cwnd < snd_control_info.ssthresh)
	        		snd_control_info.cwnd += 1;
	        	else
	        		snd_control_info.cwnd += 1/snd_control_info.cwnd;
	        	snd_control_info.cwnd = Min(snd_control_info.cwnd, snd_buffer_pool.maxCount);
	        }
		}
	}

	buf->conn->stat_total_ack_time += ackTime;
	buf->conn->stat_max_ack_time = Max(ackTime, buf->conn->stat_max_ack_time);
	buf->conn->stat_min_ack_time = Min(ackTime, buf->conn->stat_min_ack_time);

	/* only change receivedAckSeq when it is the smallest pkt we sent and
	 * have not received ack for it.
	 */
	if (bufIsHead)
		ackConn->receivedAckSeq = buf->pkt->seq;

	/* The first packet acts like a connect setup packet */
	if (buf->pkt->seq == 1)
		ackConn->state = mcsStarted;

	icBufferListAppend(&snd_buffer_pool.freeList, buf);

#ifdef AMS_VERBOSE_LOGGING
	write_log("REMOVEPKT %d from unack queue for route %d (retry %d) sndbufmaxcount %d sndbufcount %d sndbuffreelistlen %d, sntSeq %d consumedSeq %d recvAckSeq %d capacity %d, sndQ %d, unackQ %d", buf->pkt->seq, ackConn->route, buf->nRetry, snd_buffer_pool.maxCount, snd_buffer_pool.count, icBufferListLength(&snd_buffer_pool.freeList), buf->conn->sentSeq, buf->conn->consumedSeq, buf->conn->receivedAckSeq, buf->conn->capacity, icBufferListLength(&buf->conn->sndQueue), icBufferListLength(&buf->conn->unackQueue));
	if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
	{
		icBufferListLog(&buf->conn->unackQueue);
		icBufferListLog(&buf->conn->sndQueue);
	}
#endif
}

/*
 * handleAck
 * 		handle acks incoming from our upstream peers.
 *
 * if we receive a stop message, return true (caller will clean up).
 */
static bool
handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry)
{

	bool ret = false;
	MotionConn *ackConn = NULL;
	int n;

	struct sockaddr_storage peer;
	socklen_t peerlen;

	struct icpkthdr *pkt = snd_control_info.ackBuffer;


	bool shouldSendBuffers = false;

	for (;;)
	{

		/* ready to read on our socket ? */
		peerlen = sizeof(peer);
		n = recvfrom(pEntry->txfd, (char *)pkt, MIN_PACKET_SIZE, 0,
					 (struct sockaddr *)&peer, &peerlen);

		if (n < 0)
		{
			if (errno == EWOULDBLOCK) /* had nothing to read. */
			{
				aggregateStatistics(pEntry);
				return ret;
			}

			ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
			if (errno == EINTR)
				continue;

			ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
							errmsg("Interconnect error waiting for peer ack"),
							errdetail("During recvfrom() call.\n")));
			/* not reached */
		}
		else if (n < sizeof(struct icpkthdr))
		{
			continue;
		}
		else if (n != pkt->len)
		{
			continue;
		}

		/*
		 * check the CRC of the payload.
		 */
		if (gp_interconnect_full_crc)
		{
			if (!checkCRC(pkt))
			{
				gp_atomic_add_32(&ic_statistics.crcErrors, 1);
				if (DEBUG2 >= log_min_messages)
					write_log("received network data error, dropping bad packet, user data unaffected.");
				continue;
			}
		}

#ifdef AMS_VERBOSE_LOGGING
		log_pkt("GOT ACK", pkt);
#endif


		/* read packet, is this the ack we want ?
		 *
		 * Here, using gp_interconnect_id is safe, since
		 * only senders get acks. QD (never be a sender) does not. QD may
		 * have several concurrent running interconnect
		 * instances.
		 */
		if (pkt->srcContentId == GetQEIndex() &&
				pkt->srcPid == MyProcPid &&
				pkt->srcListenerPort == ((Gp_listener_port>>16) & 0x0ffff) &&
				pkt->sessionId == gp_session_id &&
				pkt->icId == gp_interconnect_id)
		{

			/* packet is for me. Note here we do not need to get a connection lock here,
			 * since background rx thread only read the hash table.
			 */
			ackConn = findConnByHeader(&ic_control_info.connHtab, pkt);

			if (ackConn == NULL)
			{
				elog(LOG, "Received ack for unknown connection (flags 0x%x)", pkt->flags);
				continue;
			}

			ackConn->stat_count_acks++;
			ic_statistics.recvAckNum++;

			uint64 now = getCurrentTime();
			ackConn->deadlockCheckBeginTime = now;

			/* We simply disregard pkt losses (NAK) due to process start race (that is,
			 * sender is started earlier than receiver. rx background thread may receive
			 * packets when connections are not created yet).
			 *
			 * Another option is to resend the packet immediately,
			 * but experiments do not show any benefits.
			 */

			if (pkt->flags & UDPIC_FLAGS_NAK)
				continue;

			while (true)
			{
				if (pkt->flags & UDPIC_FLAGS_CAPACITY)
				{
					if (pkt->extraSeq > ackConn->consumedSeq)
					{
						ackConn->capacity += pkt->extraSeq - ackConn->consumedSeq;
						ackConn->consumedSeq = pkt->extraSeq;
						shouldSendBuffers = true;
					}
				}
				else if (pkt->flags & UDPIC_FLAGS_DUPLICATE)
				{
					if (DEBUG1 >= log_min_messages)
						write_log("GOTDUPACK [seq %d] from route %d; srcpid %d dstpid %d cmd %d flags 0x%x connseq %d", pkt->seq, ackConn->route, pkt->srcPid, pkt->dstPid, pkt->icId, pkt->flags, ackConn->conn_info.seq);

					shouldSendBuffers |= (handleAckForDuplicatePkt(ackConn, pkt));
					break;
				}
				else if (pkt->flags & UDPIC_FLAGS_DISORDER)
				{
					if (DEBUG1 >= log_min_messages)
						write_log("GOTDISORDER [seq %d] from route %d; srcpid %d dstpid %d cmd %d flags 0x%x connseq %d", pkt->seq, ackConn->route, pkt->srcPid, pkt->dstPid, pkt->icId, pkt->flags, ackConn->conn_info.seq);

					shouldSendBuffers |= (handleAckForDisorderPkt(transportStates, pEntry, ackConn, pkt));
					break;
				}

				if (pkt->seq < ackConn->receivedAckSeq)
				{
					if (DEBUG1 >= log_min_messages)
						write_log("ack with bad seq?! expected (%d, %d] got %d flags 0x%x, capacity %d consumedSeq %d", ackConn->receivedAckSeq, ackConn->sentSeq, pkt->seq, pkt->flags, ackConn->capacity, ackConn->consumedSeq);
					break;
				}

				/* haven't gotten a stop request, maybe this is one ? */
				if ((pkt->flags & UDPIC_FLAGS_STOP) && !ackConn->stopRequested && ackConn->stillActive)
				{
				#ifdef AMS_VERBOSE_LOGGING
					elog(LOG, "got ack with stop; srcpid %d dstpid %d cmd %d flags 0x%x pktseq %d connseq %d", pkt->srcPid, pkt->dstPid, pkt->icId, pkt->flags, pkt->seq, ackConn->conn_info.seq);
				#endif
					ackConn->stopRequested = true;
					ackConn->conn_info.flags |= UDPIC_FLAGS_STOP;
					ret = true;
					/* continue to deal with acks */
				}

				if (pkt->seq == ackConn->receivedAckSeq)
				{
					if (DEBUG1 >= log_min_messages)
						write_log("ack with bad seq?! expected (%d, %d] got %d flags 0x%x, capacity %d consumedSeq %d", ackConn->receivedAckSeq, ackConn->sentSeq, pkt->seq, pkt->flags, ackConn->capacity, ackConn->consumedSeq);
					break;
				}

				/* deal with a regular ack. */
				if (pkt->flags & UDPIC_FLAGS_ACK)
				{
					ICBufferLink *link = NULL;
					ICBufferLink *next = NULL;
					ICBuffer *buf = NULL;

				#ifdef AMS_VERBOSE_LOGGING
					write_log("GOTACK [seq %d] from route %d; srcpid %d dstpid %d cmd %d flags 0x%x connseq %d", pkt->seq, ackConn->route, pkt->srcPid, pkt->dstPid, pkt->icId, pkt->flags, ackConn->conn_info.seq);
				#endif

					link = icBufferListFirst(&ackConn->unackQueue);
					buf = GET_ICBUFFER_FROM_PRIMARY(link);

					while (!icBufferListIsHead(&ackConn->unackQueue, link) && buf->pkt->seq <= pkt->seq)
					{
						next = link->next;
						handleAckedPacket(ackConn, buf, now);
						shouldSendBuffers = true;
						link = next;
						buf = GET_ICBUFFER_FROM_PRIMARY(link);
					}
				}
				break;
			}

			/* When there is a capacity increase or some outstanding buffers
			 * removed from the unack queue ring, we should try to send buffers for the connection.
			 * Even when stop is received, we still send here, since in STOP/EOS
			 * race case, we may have been in EOS sending logic and will not check stop message.
			 */
			if (shouldSendBuffers)
				sendBuffers(transportStates, pEntry, ackConn);
		}
		else
			if (DEBUG1 >= log_min_messages)
				write_log("handleAck: not the ack we're looking for (flags 0x%x)...mot(%d) content(%d:%d) srcpid(%d:%d) dstpid(%d) srcport(%d:%d) dstport(%d) sess(%d:%d) cmd(%d:%d)",
					pkt->flags, pkt->motNodeId,
					pkt->srcContentId, GetQEIndex(),
					pkt->srcPid, MyProcPid,
					pkt->dstPid,
					pkt->srcListenerPort, ((Gp_listener_port>>16) & 0x0ffff),
					pkt->dstListenerPort,
					pkt->sessionId, gp_session_id,
					pkt->icId, gp_interconnect_id);
	}
}

/*
 * addCRC
 * 		add CRC field to the packet.
 */
static inline void
addCRC(icpkthdr *pkt)
{
	pg_crc32 local_crc;

	INIT_CRC32C(local_crc);
 	COMP_CRC32C(local_crc, pkt, pkt->len);
 	FIN_CRC32C(local_crc);

	pkt->crc = local_crc;
}

/*
 * checkCRC
 * 		check the validity of the packet.
 */
static inline bool
checkCRC(icpkthdr *pkt)
{
	pg_crc32 rx_crc, local_crc;

	rx_crc = pkt->crc;
	pkt->crc = 0;

	INIT_CRC32C(local_crc);
 	COMP_CRC32C(local_crc, pkt, pkt->len);
 	FIN_CRC32C(local_crc);

	if (rx_crc != local_crc)
	{
		return false;
	}

	return true;
}


/*
 * prepareXmit
 * 		Prepare connection for transmit.
 */
static inline void
prepareXmit(MotionConn *conn)
{
	Assert(conn != NULL);

	conn->conn_info.len = conn->msgSize;
	conn->conn_info.crc = 0;

	memcpy(conn->pBuff, &conn->conn_info, sizeof(conn->conn_info));

	/* increase the sequence no */
	conn->conn_info.seq++;

	if (gp_interconnect_full_crc)
	{
		icpkthdr *pkt = (icpkthdr *)conn->pBuff;
		addCRC(pkt);
	}
}

/*
 * sendOnce
 * 		Send a packet.
 */
static void
sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, ICBuffer *buf, MotionConn * conn)
{
	int32			n;

#ifdef USE_ASSERT_CHECKING
	if (testmode_inject_fault(gp_udpic_dropxmit_percent))
	{
	#ifdef AMS_VERBOSE_LOGGING
		write_log("THROW PKT with seq %d srcpid %d despid %d", buf->pkt->seq, buf->pkt->srcPid, buf->pkt->dstPid);
	#endif
		return;
	}
#endif

xmit_retry:
	n = sendto(pEntry->txfd, buf->pkt, buf->pkt->len, 0,
			   (struct sockaddr *)&conn->peer, conn->peer_len);
	if (n < 0)
	{
		if (errno == EINTR)
			goto xmit_retry;

		if (errno == EAGAIN) /* no space ? not an error. */
			return;

		if (errno == EPERM)
		{
			ereport(LOG,
					(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
					 errmsg("Interconnect error writing an outgoing packet: %m"),
					 errdetail("error during sendto() for Remote Connection: contentId=%d at %s",
							   conn->remoteContentId, conn->remoteHostAndPort)));
			return;
		}

		ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
						errmsg("Interconnect error writing an outgoing packet: %m"),
						errdetail("error during sendto() call (error:%d).\n"
								  "For Remote Connection: contentId=%d at %s",
								  errno, conn->remoteContentId,
								  conn->remoteHostAndPort)));
		/* not reached */
	}

	if (n != buf->pkt->len)
	{
		if (DEBUG1 >= log_min_messages)
			write_log("Interconnect error writing an outgoing packet [seq %d]: short transmit (given %d sent %d) during sendto() call."
				  "For Remote Connection: contentId=%d at %s", buf->pkt->seq, buf->pkt->len, n,
				  conn->remoteContentId,
				  conn->remoteHostAndPort);
	#ifdef AMS_VERBOSE_LOGGING
		logPkt("PKT DETAILS ", buf->pkt);
	#endif
	}

	return;
}


/*
 * handleStopMsgs
 *		handle stop messages.
 *
 */
static void
handleStopMsgs(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, int16 motionId)
{
	int 	i = 0;

#ifdef AMS_VERBOSE_LOGGING
	elog(DEBUG3, "handleStopMsgs: node %d", motionId);
#endif
	while (i < pEntry->numConns)
	{
		MotionConn *conn=NULL;

		conn = pEntry->conns + i;

#ifdef AMS_VERBOSE_LOGGING
		elog(DEBUG3, "handleStopMsgs: node %d route %d %s %s", motionId, conn->route,
			(conn->stillActive ? "active" : "NOT active"), (conn->stopRequested ? "stop requested" : ""));
		elog(DEBUG3, "handleStopMsgs: node %d route %d msgSize %d", motionId, conn->route, conn->msgSize);
#endif

		/*
		 * MPP-2427: we're guaranteed to have recently flushed, but
		 * this might not be empty (if we got a stop on a buffer that
		 * wasn't the one we were sending) ... empty it first so the
		 * outbound buffer is empty when we get here.
		 */
		if (conn->stillActive && conn->stopRequested)
		{

			/* mark buffer empty */
			conn->tupleCount = 0;
			conn->msgSize = sizeof(conn->conn_info);

			/* now send our stop-ack EOS */
			conn->conn_info.flags |= UDPIC_FLAGS_EOS;

			Assert(conn->curBuff != NULL);

			conn->pBuff[conn->msgSize] = 'S';
			conn->msgSize += 1;

			prepareXmit(conn);

			/* now ready to actually send */
			if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
				elog(DEBUG1, "handleStopMsgs: node %d route %d, seq %d", motionId, i, conn->conn_info.seq);

			/* place it into the send queue */
			icBufferListAppend(&conn->sndQueue, conn->curBuff);

			/* return all buffers */
			icBufferListReturn(&conn->sndQueue, false);
			icBufferListReturn(&conn->unackQueue, Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_CAPACITY ? false : true);

			conn->tupleCount = 0;
			conn->msgSize = sizeof(conn->conn_info);

			conn->state = mcsEosSent;
			conn->curBuff = NULL;
			conn->pBuff = NULL;
			conn->stillActive = false;
			conn->stopRequested = false;
		}

		i++;

		if (i == pEntry->numConns)
		{
			if (pollAcks(transportStates, pEntry->txfd, 0))
			{
				if (handleAcks(transportStates, pEntry))
				{
					/* more stops found, loop again.*/
					i = 0;
					continue;
				}
			}
		}
	}
}


/*
 * sendBuffers
 * 		Called by sender to send the buffers in the send queue.
 *
 * Send the buffers in the send queue of the connection if there is capacity left
 * and the congestion control condition is satisfied.
 *
 * Here, we make assure that a connection can have at least one outstanding buffer.
 * This is very important for two reasons:
 *
 * 1) The handling logic of the ack of the outstanding buffer can always send a buffer
 *    in the send queue. Otherwise, there maybe a deadlock.
 * 2) This makes assure that any connection can have a minimum bandwidth for data
 *    sending.
 *
 * After sending a buffer, the buffer will be placed into both the unack queue and
 * the corresponding queue in the unack queue ring.
 */
static void
sendBuffers(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn)
{
	while (conn->capacity > 0 && icBufferListLength(&conn->sndQueue) > 0)
	{
		ICBuffer *buf = NULL;

		if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS && (icBufferListLength(&conn->unackQueue) > 0
				&& unack_queue_ring.numSharedOutStanding >= (snd_control_info.cwnd - snd_control_info.minCwnd)))
			break;

		/* for connection setup, we only allow one outstanding packet. */
		if (conn->state == mcsSetupOutgoingConnection && icBufferListLength(&conn->unackQueue) >= 1)
			break;

		buf = icBufferListPop(&conn->sndQueue);

		uint64 now = getCurrentTime();
		buf->sentTime = now;
		buf->unackQueueRingSlot = -1;
		buf->nRetry = 0;
		buf->conn = conn;
		conn->capacity--;

		icBufferListAppend(&conn->unackQueue, buf);

		if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS)
		{
			unack_queue_ring.numOutStanding++;
			if (icBufferListLength(&conn->unackQueue) > 1)
				unack_queue_ring.numSharedOutStanding++;

			putIntoUnackQueueRing(&unack_queue_ring, buf, computeExpirationPeriod(buf->conn, buf->nRetry), now);
		}

		/*
		 * Note the place of sendOnce here.
		 * If we send before appending it to the unack queue and
		 * putting it into unack queue ring, and there is a
		 * network error occurred in the sendOnce function, error
		 * message will be output. In the time of error message output,
		 * interrupts is potentially checked, if there is a pending query cancel,
		 * it will lead to a dangled buffer (memory leak).
		 */
#ifdef TRANSFER_PROTOCOL_STATS
		updateStats(TPE_DATA_PKT_SEND, conn, buf->pkt);
#endif

		sendOnce(transportStates, pEntry, buf, conn);
		ic_statistics.sndPktNum++;

#ifdef AMS_VERBOSE_LOGGING
		logPkt("SEND PKT DETAIL", buf->pkt);
#endif

		buf->conn->sentSeq = buf->pkt->seq;
	}
}

/*
 * handleDisorderPacket
 * 		Called by rx thread to assemble and send a disorder message.
 *
 * In current implementation, we limit the number of lost packet sequence numbers
 * in the disorder message by the MIN_PACKET_SIZE. There are two reasons here:
 *
 * 1) The maximal number of lost packet sequence numbers are actually bounded by the
 *    receive queue depth whose maximal value is very large. Since we share the packet
 *    receive and ack receive in the background thread, the size of disorder should be
 *    also limited by the max packet size.
 * 2) We can use Gp_max_packet_size here to limit the number of lost packet sequence numbers.
 *    But considering we do not want to let senders send many packets when getting a lost
 *    message. Here we use MIN_PACKET_SIZE.
 *
 *
 * the format of a disorder message:
 * I) pkt header
 *  - seq      -> packet sequence number that triggers the disorder message
 *  - extraSeq -> the largest seq of the received packets
 *  - flags    -> UDPIC_FLAGS_DISORDER
 *  - len      -> sizeof(icpkthdr) + sizeof(uint32) * (lost pkt count)
 * II) content
 *  - an array of lost pkt sequence numbers (uint32)
 *
 */
static void
handleDisorderPacket(MotionConn *conn, int pos, uint32 tailSeq, icpkthdr *pkt)
{
	int start = 0;
	uint32 lostPktCnt = 0;
	uint32 *curSeq = (uint32 *)&rx_control_info.disorderBuffer[1];
	uint32 maxSeqs = MAX_SEQS_IN_DISORDER_ACK;

#ifdef AMS_VERBOSE_LOGGING
	write_log("PROCESS_DISORDER PKT BEGIN:");
#endif

	start = conn->pkt_q_tail;

	while (start != pos && lostPktCnt < maxSeqs)
	{
		if (conn->pkt_q[start] == NULL)
		{
			*curSeq = tailSeq;
			lostPktCnt++;
			curSeq++;

		#ifdef AMS_VERBOSE_LOGGING
			write_log("PROCESS_DISORDER add seq [%d], lostPktCnt %d", *curSeq, lostPktCnt);
		#endif
		}
		tailSeq++;
		start = (start + 1) % Gp_interconnect_queue_depth;
	}

#ifdef AMS_VERBOSE_LOGGING
	write_log("PROCESS_DISORDER PKT END:");
#endif

	/* when reaching here, cnt must not be 0 */
	sendDisorderAck(conn, pkt->seq, conn->conn_info.seq - 1, lostPktCnt);
}

/*
 * handleAckForDisorderPkt
 * 		Called by sender to deal with acks for disorder packet.
 */

static bool
handleAckForDisorderPkt(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn, icpkthdr *pkt)
{

	ICBufferLink *link = NULL;
	ICBuffer *buf = NULL;
	ICBufferLink *next = NULL;
	uint64 now = getCurrentTime();
	uint32 *curLostPktSeq = 0;
	int lostPktCnt = 0;
	static uint32 times = 0;
	static uint32 lastSeq = 0;
	bool shouldSendBuffers = false;

	if (pkt->extraSeq != lastSeq)
	{
		lastSeq = pkt->extraSeq;
		times = 0;
		return false;
	}
	else
	{
		times++;
		if (times != 2)
			return false;
	}

	curLostPktSeq = (uint32 *) &pkt[1];
	lostPktCnt = (pkt->len - sizeof(icpkthdr)) / sizeof(uint32);

	/* Resend all the missed packets and remove received packets from queues
	 */

	link = icBufferListFirst(&conn->unackQueue);
	buf = GET_ICBUFFER_FROM_PRIMARY(link);

#ifdef AMS_VERBOSE_LOGGING
	write_log("DISORDER: pktlen %d cnt %d pktseq %d first loss %d buf %p", pkt->len, lostPktCnt, pkt->seq, *curLostPktSeq, buf);
	if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
	{
		icBufferListLog(&conn->unackQueue);
		icBufferListLog(&conn->sndQueue);
	}
#endif

	/*
	 * iterate the unack queue
	 */
	while (!icBufferListIsHead(&conn->unackQueue, link) && buf->pkt->seq <= pkt->seq && lostPktCnt > 0)
	{
#ifdef AMS_VERBOSE_LOGGING
		write_log("DISORDER: bufseq %d curlostpkt %d cnt %d buf %p pkt->seq %d", buf->pkt->seq, *curLostPktSeq, lostPktCnt, buf, pkt->seq);
#endif

		if (buf->pkt->seq == pkt->seq)
		{
			handleAckedPacket(conn, buf, now);
			shouldSendBuffers = true;
			break;
		}

		if (buf->pkt->seq == *curLostPktSeq)
		{
			/* this is a lost packet, retransmit */

			buf->nRetry++;
			if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS)
			{
				buf = icBufferListDelete(&unack_queue_ring.slots[buf->unackQueueRingSlot], buf);
				putIntoUnackQueueRing(&unack_queue_ring, buf,
						computeExpirationPeriod(buf->conn, buf->nRetry), now);
			}
#ifdef TRANSFER_PROTOCOL_STATS
			updateStats(TPE_DATA_PKT_SEND, conn, buf->pkt);
#endif

			sendOnce(transportStates, pEntry, buf, buf->conn);

#ifdef AMS_VERBOSE_LOGGING
			write_log("RESEND a buffer for DISORDER: seq %d", buf->pkt->seq);
			logPkt("DISORDER RESEND DETAIL ", buf->pkt);
#endif

			ic_statistics.retransmits++;
			curLostPktSeq++;
			lostPktCnt--;

			link = link->next;
			buf = GET_ICBUFFER_FROM_PRIMARY(link);
		}
		else if (buf->pkt->seq < *curLostPktSeq)
		{
			/* remove packet already received. */

			next = link->next;
			handleAckedPacket(conn, buf, now);
			shouldSendBuffers = true;
			link = next;
			buf = GET_ICBUFFER_FROM_PRIMARY(link);
		}
		else /* buf->pkt->seq > *curPktSeq */
		{
			/* this case is introduced when the disorder message tell
			 * you a pkt is lost. But when we handle this message, a
			 * message (for example, duplicate ack, or another disorder message)
			 * arriving before this message already removed the pkt.
			 */
			curLostPktSeq++;
			lostPktCnt--;
		}
	}
	if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS)
	{
		snd_control_info.ssthresh = Max(snd_control_info.cwnd/2, snd_control_info.minCwnd);
		snd_control_info.cwnd = snd_control_info.ssthresh;
	}
#ifdef AMS_VERBOSE_LOGGING
	write_log("After DISORDER: sndQ %d unackQ %d", icBufferListLength(&conn->sndQueue), icBufferListLength(&conn->unackQueue));
	if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
	{
		icBufferListLog(&conn->unackQueue);
		icBufferListLog(&conn->sndQueue);
	}
#endif

	return shouldSendBuffers;
}

/*
 * handleAckForDuplicatePkt
 * 		Called by sender to deal with acks for duplicate packet.
 *
 */
static bool
handleAckForDuplicatePkt(MotionConn *conn, icpkthdr *pkt)
{
	ICBufferLink *link = NULL;
	ICBuffer *buf = NULL;
	ICBufferLink *next = NULL;
	uint64 now = getCurrentTime();
	bool shouldSendBuffers = false;

#ifdef AMS_VERBOSE_LOGGING
	write_log("RESEND the unacked buffers in the queue due to %s", pkt->len == 0 ? "PROCESS_START_RACE" : "DISORDER");
#endif

	if (pkt->seq <= pkt->extraSeq)
	{
		/* Indicate a bug here. */
		write_log("ERROR: invalid duplicate message: seq %d extraSeq %d", pkt->seq, pkt->extraSeq);
		return false;
	}

	link = icBufferListFirst(&conn->unackQueue);
	buf = GET_ICBUFFER_FROM_PRIMARY(link);

	/* deal with continuous pkts */
	while (!icBufferListIsHead(&conn->unackQueue, link) && (buf->pkt->seq <= pkt->extraSeq))
	{
		next = link->next;
		handleAckedPacket(conn, buf, now);
		shouldSendBuffers = true;
		link = next;
		buf = GET_ICBUFFER_FROM_PRIMARY(link);
	}

	/* deal with the single duplicate packet */
	while (!icBufferListIsHead(&conn->unackQueue, link) && buf->pkt->seq <= pkt->seq)
	{
		next = link->next;
		if (buf->pkt->seq == pkt->seq)
		{
			handleAckedPacket(conn, buf, now);
			shouldSendBuffers = true;
			break;
		}
		link = next;
		buf = GET_ICBUFFER_FROM_PRIMARY(link);
	}

	return shouldSendBuffers;
}

/*
 * checkNetworkTimeout
 *		check network timeout case.
 */
static inline void
checkNetworkTimeout(ICBuffer *buf, uint64 now)
{
	/* Using only the time to first sent time to decide timeout is not enough,
	 * since there is a possibility the sender process is not scheduled or blocked
	 * by OS for a long time. In this case, only a few times are tried.
	 * Thus, the GUC Gp_interconnect_min_retries_before_timeout is added here.
	 */
	if ((buf->nRetry > Gp_interconnect_min_retries_before_timeout) && (now - buf->sentTime) > ((uint64)Gp_interconnect_transmit_timeout * 1000 * 1000))
	{
		ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
						errmsg("Interconnect encountered a network error, please check your network"),
						errdetail("Failed to send packet (seq %d) to %s (pid %d cid %d) after %d retries in %d seconds", buf->pkt->seq, buf->conn->remoteHostAndPort, buf->pkt->dstPid, buf->pkt->dstContentId, buf->nRetry, Gp_interconnect_transmit_timeout)));
	}
}

/*
 * checkExpiration
 * 		Check whether packets expire. If a packet expires, resend the packet,
 * 		and adjust its position in the unack queue ring.
 *
 */
static void
checkExpiration(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *triggerConn, uint64 now)
{
	/* check for expiration */
	int count = 0;
	int retransmits = 0;
	while (now >= (unack_queue_ring.currentTime + TIMER_SPAN) && count++ < UNACK_QUEUE_RING_SLOTS_NUM)
	{
		/* expired, need to resend them */
		ICBuffer *curBuf = NULL;
		while ((curBuf = icBufferListPop(&unack_queue_ring.slots[unack_queue_ring.idx])) != NULL)
		{
			curBuf->nRetry++;
			putIntoUnackQueueRing(
					&unack_queue_ring,
					curBuf,
					computeExpirationPeriod(curBuf->conn, curBuf->nRetry), now);

#ifdef TRANSFER_PROTOCOL_STATS
			updateStats(TPE_DATA_PKT_SEND, curBuf->conn, curBuf->pkt);
#endif

			sendOnce(transportStates, pEntry, curBuf, curBuf->conn);

			retransmits++;
			ic_statistics.retransmits++;
			curBuf->conn->stat_count_resent++;
			curBuf->conn->stat_max_resent = Max(curBuf->conn->stat_max_resent, curBuf->conn->stat_count_resent);

			checkNetworkTimeout(curBuf, now);

		#ifdef AMS_VERBOSE_LOGGING
			write_log("RESEND pkt with seq %d (retry %d, rtt " UINT64_FORMAT ") to route %d", curBuf->pkt->seq, curBuf->nRetry, curBuf->conn->rtt, curBuf->conn->route);
			logPkt("RESEND PKT in checkExpiration", curBuf->pkt);
		#endif
		}

		unack_queue_ring.currentTime += TIMER_SPAN;
		unack_queue_ring.idx = (unack_queue_ring.idx + 1) % (UNACK_QUEUE_RING_SLOTS_NUM);
	}

	/*
	 * deal with case when there is a long time this function is not called.
	 */
	unack_queue_ring.currentTime = now - (now % TIMER_SPAN);
	if (retransmits > 0 )
	{
		snd_control_info.ssthresh = Max(snd_control_info.cwnd/2, snd_control_info.minCwnd);
		snd_control_info.cwnd = snd_control_info.minCwnd;
	}
}

/*
 * checkDeadlock
 * 		Check whether deadlock occurs on a connection.
 *
 * What this function does is to send a status query message to rx thread when the connection has
 * not received any acks for some time. This is to avoid potential deadlock when there are continuous
 * ack losses. Packet resending logic does not help avoiding deadlock here since the packets in the unack
 * queue may already been removed when the sender knows that they have been already buffered in the
 * receiver side queue.
 *
 * Some considerations on deadlock check time period:
 *
 * Potential deadlock occurs rarely. According to our experiments on various workloads
 * and hardware. It occurred only when fault injection is enabled and a large number packets and
 * acknowledgments are discarded. Thus, here we use a relatively large deadlock check period.
 *
 */
static void
checkDeadlock(ChunkTransportStateEntry *pEntry, MotionConn *conn)
{
	uint64 deadlockCheckTime;

	if (icBufferListLength(&conn->unackQueue) == 0 && conn->capacity == 0 && icBufferListLength(&conn->sndQueue) > 0)
	{
		/* we must have received some acks before deadlock occurs. */
		Assert(conn->deadlockCheckBeginTime > 0);

#ifdef USE_ASSERT_CHECKING
		if (udp_testmode)
		{
			deadlockCheckTime = 100000;
		}
		else
#endif
		{
			deadlockCheckTime = DEADLOCK_CHECKING_TIME;
		}

		uint64 now = getCurrentTime();

		/* request the capacity to avoid the deadlock case */
		if (((now - ic_control_info.lastDeadlockCheckTime) > deadlockCheckTime) && ((now - conn->deadlockCheckBeginTime) > deadlockCheckTime))
		{
			sendStatusQueryMessage(conn, pEntry->txfd, conn->conn_info.seq - 1);
			ic_control_info.lastDeadlockCheckTime = now;
			ic_statistics.statusQueryMsgNum++;

			/* check network error. */
			if ((now - conn->deadlockCheckBeginTime) > ((uint64)Gp_interconnect_transmit_timeout * 1000 * 1000))
			{
				ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
								errmsg("Interconnect encountered a network error, please check your network"),
								errdetail("Did not get any response from %s (pid %d cid %d) in %d seconds", conn->remoteHostAndPort, conn->conn_info.dstPid, conn->conn_info.dstContentId, Gp_interconnect_transmit_timeout)));
			}
		}
	}
}

/*
 * pollAcks
 * 		Timeout polling of acks
 */
static inline bool
pollAcks(ChunkTransportState *transportStates, int fd, int timeout)
{
	struct pollfd nfd;
	int n;

	nfd.fd = fd;
	nfd.events = POLLIN;

	n = poll(&nfd, 1, timeout);
	if (n < 0)
	{
		ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
		if (errno == EINTR)
			return false;

		ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
						errmsg("Interconnect error waiting for peer ack"),
						errdetail("During poll() call.\n")));

		/* not reached */
	}

	if (n == 0) /* timeout */
	{
		return false;
	}

	/* got an ack to handle (possibly a stop message) */
	if (n == 1 && (nfd.events & POLLIN))
	{
		return true;
	}

	return false;

}

/*
 * updateRetransmitStatistics
 * 		Update the restransmit statistics.
 */
static inline void
updateRetransmitStatistics(MotionConn *conn)
{
	ic_statistics.retransmits++;
	conn->stat_count_resent++;
	conn->stat_max_resent = Max(conn->stat_max_resent, conn->stat_count_resent);
}

/*
 * checkExpirationCapacityFC
 * 		Check expiration for capacity based flow control method.
 *
 */
static void
checkExpirationCapacityFC(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn, int timeout)
{
	if (icBufferListLength(&conn->unackQueue) == 0)
		return;

	uint64 now = getCurrentTime();
	uint64 elapsed = now - ic_control_info.lastPacketSendTime;

	if (elapsed >= ((uint64)timeout * 1000))
	{
		ICBufferLink *bufLink = icBufferListFirst(&conn->unackQueue);
		ICBuffer *buf = GET_ICBUFFER_FROM_PRIMARY(bufLink);

		sendOnce(transportStates, pEntry, buf, buf->conn);
        buf->nRetry++;
		ic_control_info.lastPacketSendTime = now;

		updateRetransmitStatistics(conn);
		checkNetworkTimeout(buf, now);
	}
}

/*
 * checkExceptions
 * 		Check exceptions including packet expiration, deadlock, bg thread error, NIC failure...
 */
static void
checkExceptions(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn, int retry, int timeout)
{
	if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_CAPACITY/* || conn->state == mcsSetupOutgoingConnection*/)
	{
		checkExpirationCapacityFC(transportStates, pEntry, conn, timeout);
	}

	if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS)
	{
		uint64 now = getCurrentTime();
		if(now - ic_control_info.lastExpirationCheckTime > TIMER_CHECKING_PERIOD)
		{
			checkExpiration(transportStates, pEntry, conn, now);
			ic_control_info.lastExpirationCheckTime = now;
		}
	}

	if ((retry & 0x3) == 2)
	{
		checkDeadlock(pEntry, conn);
		checkRxThreadError();
		ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
	}

	/* NIC on master (and thus the QD connection) may become bad, check it. */
	/* We check modulo 2 in case that 'retry' all less than 0x3f */
	if ((retry & 0x3f) == 2)
		checkQDConnectionAlive();
}

/*
 * computeTimeout
 * 		Compute timeout value in ms.
 */
static inline int
computeTimeout(MotionConn *conn, int retry)
{
    if (icBufferListLength(&conn->unackQueue) == 0)
        return TIMER_CHECKING_PERIOD;

    ICBufferLink *bufLink = icBufferListFirst(&conn->unackQueue);
    ICBuffer *buf = GET_ICBUFFER_FROM_PRIMARY(bufLink);

    if (buf->nRetry == 0 && retry == 0)
    	return 0;

    if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS)
        return TIMER_CHECKING_PERIOD;

    /* for capacity based flow control */
    return TIMEOUT(buf->nRetry);
}

/*
 * SendChunkUDP
 * 		is used to send a tcItem to a single destination. Tuples often are
 * 		*very small* we aggregate in our local buffer before sending into the kernel.
 *
 * PARAMETERS
 *	 conn - MotionConn that the tcItem is to be sent to.
 *	 tcItem - message to be sent.
 *	 motionId - Node Motion Id.
 */
static bool
SendChunkUDP(MotionLayerState *mlStates,
					  ChunkTransportState *transportStates,
					  ChunkTransportStateEntry *pEntry,
					  MotionConn *conn,
					  TupleChunkListItem tcItem,
					  int16 motionId)
{

	int		length=TYPEALIGN(TUPLE_CHUNK_ALIGN, tcItem->chunk_length);
	int		retry = 0;
	bool	doCheckExpiration = false;
	bool	gotStops = false;

	Assert(conn->msgSize > 0);

#ifdef AMS_VERBOSE_LOGGING
	elog(DEBUG3, "sendChunk: msgSize %d this chunk length %d conn seq %d", conn->msgSize, tcItem->chunk_length, conn->conn_info.seq);
#endif

	if (conn->msgSize + length <= Gp_max_packet_size)
	{
		memcpy(conn->pBuff + conn->msgSize, tcItem->chunk_data, tcItem->chunk_length);
		conn->msgSize += length;

		conn->tupleCount++;
		return true;
	}

	/* prepare this for transmit */

	ic_statistics.totalCapacity += conn->capacity;
	ic_statistics.capacityCountingTime++;

	/* try to send it */

	prepareXmit(conn);

	icBufferListAppend(&conn->sndQueue, conn->curBuff);
	sendBuffers(transportStates, pEntry, conn);

	uint64 now = getCurrentTime();

	if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_CAPACITY)
		doCheckExpiration = false;
	else
		doCheckExpiration = (now - ic_control_info.lastExpirationCheckTime) > MAX_TIME_NO_TIMER_CHECKING ? true : false;

	/* get a new buffer */
	conn->curBuff = NULL;
	conn->pBuff = NULL;

	ic_control_info.lastPacketSendTime = 0;
	conn->deadlockCheckBeginTime = now;

	while (doCheckExpiration || (conn->curBuff = getSndBuffer(conn)) == NULL)
	{
		int timeout =  (doCheckExpiration ? 0 : computeTimeout(conn, retry));

		if (pollAcks(transportStates, pEntry->txfd, timeout))
		{
			if (handleAcks(transportStates, pEntry))
			{
				/* We make sure that we deal with the stop messages
				 * only after we get a buffer. Otherwise, if the stop
				 * message is not for this connection, this will lead
				 * to an error for the following data sending of this
				 * connection.
				 */
				gotStops = true;
			}
		}
		checkExceptions(transportStates, pEntry, conn, ++retry, timeout);
		doCheckExpiration = false;
	}

	conn->pBuff = (uint8 *) conn->curBuff->pkt;

	if (gotStops)
	{
		/* handling stop message will make some connection not active anymore */
		handleStopMsgs(transportStates, pEntry, motionId);
		gotStops = false;
		if (!conn->stillActive)
			return true;
	}

	/* reinitialize connection */
	conn->tupleCount = 0;
	conn->msgSize = sizeof(conn->conn_info);

	/* now we can copy the input to the new buffer */
	memcpy(conn->pBuff + conn->msgSize, tcItem->chunk_data, tcItem->chunk_length);
	conn->msgSize += length;

	conn->tupleCount++;

	return true;
}

/*
 * SendEosUDP
 * 		broadcast eos messages to receivers.
 *
 * See ml_ipc.h
 *
 */
static void
SendEosUDP(MotionLayerState *mlStates,
		   ChunkTransportState *transportStates,
		   int motNodeID,
		   TupleChunkListItem tcItem)
{
	ChunkTransportStateEntry *pEntry = NULL;
	MotionConn *conn;
	int			i = 0;
	int			retry = 0;
	int			activeCount = 0;
	int			timeout = 0;

	if (!transportStates)
	{
		elog(FATAL, "SendEosUDP: missing interconnect context.");
	}
	else if (!transportStates->activated && !transportStates->teardownActive)
	{
		elog(FATAL, "SendEosUDP: context and teardown inactive.");
	}
#ifdef AMS_VERBOSE_LOGGING
	elog(LOG, "entering seneosudp");
#endif

	/* check em' */
	ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);

	getChunkTransportState(transportStates, motNodeID, &pEntry);

	if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
		elog(DEBUG1, "Interconnect seg%d slice%d sending end-of-stream to slice%d",
			 GetQEIndex(), motNodeID, pEntry->recvSlice->sliceIndex);

	/* we want to add our tcItem onto each of the outgoing buffers --
	 * this is guaranteed to leave things in a state where a flush is
	 * *required*.
	 */
	doBroadcast(mlStates, transportStates, pEntry, tcItem, NULL);

	pEntry->sendingEos = true;

	uint64 now = getCurrentTime();

	/* now flush all of the buffers. */
	for (i = 0; i < pEntry->numConns; i++)
	{
		conn = pEntry->conns + i;

		if (conn->stillActive)
		{
			if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
				elog(DEBUG1, "sent eos to route %d tuplecount %d seq %d flags 0x%x stillActive %s icId %d %d",
						conn->route, conn->tupleCount, conn->conn_info.seq, conn->conn_info.flags, (conn->stillActive ? "true" : "false"), conn->conn_info.icId, conn->msgSize);

			/* prepare this for transmit */
			if (pEntry->sendingEos)
				conn->conn_info.flags |= UDPIC_FLAGS_EOS;

			prepareXmit(conn);

			/* place it into the send queue */
			icBufferListAppend(&conn->sndQueue, conn->curBuff);
			sendBuffers(transportStates, pEntry, conn);

			conn->tupleCount = 0;
			conn->msgSize = sizeof(conn->conn_info);
			conn->curBuff = NULL;
			conn->deadlockCheckBeginTime = now;

			activeCount++;
		}
	}

	/*
	 * Now waiting for acks from receivers.
	 *
	 * Note here waiting is done in a separate phase from the EOS sending phase
	 * to make the processing faster when a lot of connections are slow and have
	 * frequent packet losses. In fault injection tests, we found this.
	 *
	 */

	while (activeCount > 0)
	{
		activeCount = 0;

		for (i = 0; i < pEntry->numConns; i++)
		{
			conn = pEntry->conns + i;

			if (conn->stillActive)
			{
				retry = 0;
				ic_control_info.lastPacketSendTime = 0;

				/* wait until this queue is emptied */
				while (icBufferListLength(&conn->unackQueue) > 0 || icBufferListLength(&conn->sndQueue) > 0)
				{
					timeout =  computeTimeout(conn, retry);

					if (pollAcks(transportStates, pEntry->txfd, timeout))
						handleAcks(transportStates, pEntry);

					checkExceptions(transportStates, pEntry, conn, ++retry, timeout);

					if (retry >= MAX_TRY)
						break;
				}
			}

			if (icBufferListLength(&conn->unackQueue) == 0 && icBufferListLength(&conn->sndQueue) == 0)
			{
				conn->state = mcsEosSent;
				conn->stillActive = false;
			}
			else
			{
				activeCount++;
			}
		}
	}

	if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
		elog(DEBUG1, "SendEosUDP leaving, activeCount %d", activeCount);

  if (Debug_print_execution_detail) {
    instr_time  time;
    INSTR_TIME_SET_CURRENT(time);
    elog(DEBUG1,"The time before quit SendEosUDP: %.3f ms, activeCount %d",
                     1000.0 * INSTR_TIME_GET_DOUBLE(time), activeCount);
  }
}

/*
 * doSendStopMessageUDP
 * 		Send stop messages to all senders.
 */
static void
doSendStopMessageUDP(ChunkTransportState *transportStates, int16 motNodeID)
{
	ChunkTransportStateEntry	*pEntry = NULL;
	MotionConn			*conn = NULL;
	int			i;

	if (!transportStates->activated)
		return;

	getChunkTransportState(transportStates, motNodeID, &pEntry);
	Assert(pEntry);

	/*
	 * Note: we're only concerned with receivers here.
	 */
	pthread_mutex_lock(&ic_control_info.lock);

	if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
		elog(DEBUG1, "Interconnect needs no more input from slice%d; notifying senders to stop.",
			 motNodeID);

	for (i = 0; i < pEntry->numConns; i++)
	{
		conn = pEntry->conns + i;

		/* Note here, the stillActive flag of a connection may have been
		 * set to false by markUDPConnInactive.
		 */
		if (conn->stillActive)
		{
			if (conn->conn_info.flags & UDPIC_FLAGS_EOS)
			{
				/* we have a queued packet that has EOS in it. We've acked it, so we're done */
				if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
					elog(DEBUG1, "do sendstop: already have queued EOS packet, we're done. node %d route %d", motNodeID, i);

				conn->stillActive = false;

				/* need to drop the queues in the teardown function. */
				while (conn->pkt_q_size > 0)
				{
					putRxBufferAndSendAck(conn, NULL);
				}
			}
			else
			{
				conn->stopRequested = true;
				conn->conn_info.flags |= UDPIC_FLAGS_STOP;

				/*
				 * The peer addresses for incoming connections will not be set until
				 * the first packet has arrived. However, when the lower slice does not have data to send,
				 * the corresponding peer address for the incoming connection will never be set.
				 * We will skip sending ACKs to those connections.
				 */

#ifdef FAULT_INJECTOR
				if (FaultInjector_InjectFaultIfSet(
												   InterconnectStopAckIsLost,
												   DDLNotSpecified,
												   "" /* databaseName */,
												   "" /* tableName */) == FaultInjectorTypeSkip)
				{
					pthread_mutex_unlock(&ic_control_info.lock);
					continue;
				}
#endif

				if (conn->peer.ss_family == AF_INET || conn->peer.ss_family == AF_INET6)
				{
					uint32 seq = conn->conn_info.seq > 0 ? conn->conn_info.seq - 1 : 0;
					sendAck(conn, UDPIC_FLAGS_STOP | UDPIC_FLAGS_ACK | UDPIC_FLAGS_CAPACITY | conn->conn_info.flags, seq, seq);

					if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
						elog(DEBUG1, "sent stop message. node %d route %d seq %d", motNodeID, i, seq);
				}
				else
				{
					if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
						elog(DEBUG1, "first packet did not arrive yet. don't sent stop message. node %d route %d", motNodeID, i);
				}
			}
		}
	}
	pthread_mutex_unlock(&ic_control_info.lock);
}

/*
 * formatSockAddr
 * 		Format sockaddr.
 *
 * NOTE: Because this function can be called in a thread (rxThreadFunc),
 * it must not use services such as elog, ereport, palloc/pfree and StringInfo.
 * elog is NOT thread-safe.  Developers should instead use something like:
 *
 *	if (DEBUG3 >= log_min_messages)
 *		write_log("my brilliant log statement here.");
 */
char *
formatSockAddr(struct sockaddr *sa, char* buf, int bufsize)
{
	/* Save remote host:port string for error messages. */
	if (sa->sa_family == AF_INET)
	{
		struct sockaddr_in *	sin = (struct sockaddr_in *)sa;
		uint32					saddr = ntohl(sin->sin_addr.s_addr);

		snprintf(buf, bufsize, "%d.%d.%d.%d:%d",
				 (saddr >> 24)&0xff,
				 (saddr >> 16)&0xff,
				 (saddr >> 8)&0xff,
				 saddr&0xff,
				 ntohs(sin->sin_port));
	}
#ifdef HAVE_IPV6
	else if (sa->sa_family == AF_INET6)
	{
		char remote_port[32];
		remote_port[0] = '\0';
		buf[0] = '\0';

		if (bufsize > 10)
		{
			buf[0] = '[';
			buf[1] = '\0'; /* in case getnameinfo fails */
			/*
			 * inet_ntop isn't portable.
			 * //inet_ntop(AF_INET6, &sin6->sin6_addr, buf, bufsize - 8);
			 *
			 * postgres has a standard routine for converting addresses to printable format,
			 * which works for IPv6, IPv4, and Unix domain sockets.  I've changed this
			 * routine to use that, but I think the entire formatSockAddr routine could
			 * be replaced with it.
			 */
			int ret = pg_getnameinfo_all((const struct sockaddr_storage *)sa, sizeof(struct sockaddr_in6),
							   buf+1, bufsize-10,
							   remote_port, sizeof(remote_port),
							   NI_NUMERICHOST | NI_NUMERICSERV);
			if (ret != 0)
			{
				write_log("getnameinfo returned %d: %s, and says %s port %s",ret,gai_strerror(ret),buf,remote_port);
				/*
				 * Fall back to using our internal inet_ntop routine, which really is for inet datatype
				 * This is because of a bug in solaris, where getnameinfo sometimes fails
				 * Once we find out why, we can remove this
				 */
				snprintf(remote_port,sizeof(remote_port),"%d",((struct sockaddr_in6 *)sa)->sin6_port);
				/*
				 * This is nasty: our internal inet_net_ntop takes PGSQL_AF_INET6, not AF_INET6, which
				 * is very odd... They are NOT the same value (even though PGSQL_AF_INET == AF_INET
				 */
#define PGSQL_AF_INET6	(AF_INET + 1)
				inet_net_ntop(PGSQL_AF_INET6, sa, sizeof(struct sockaddr_in6), buf+1, bufsize-10);
				write_log("Our alternative method says %s]:%s",buf,remote_port);

			}
			buf += strlen(buf);
			strcat(buf,"]");
			buf++;
		}
		snprintf(buf, 8, ":%s", remote_port);
	}
#endif
	else
		snprintf(buf, bufsize, "?host?:?port?");

	return buf;
}								/* formatSockAddr */

/*
 * getCurrentTime
 * 		get current time
 *
 */
static uint64
getCurrentTime(void)
{
	struct timeval newTime;
	int status = 1;
	uint64 t = 0;

#if HAVE_LIBRT
	/* Use clock_gettime to return monotonic time value. */
	struct timespec ts;
	status = clock_gettime(CLOCK_MONOTONIC, &ts);

	newTime.tv_sec = ts.tv_sec;
	newTime.tv_usec = ts.tv_nsec / 1000;

#endif

	if (status != 0)
		gettimeofday(&newTime, NULL);

	t = ((uint64)newTime.tv_sec) * USECS_PER_SECOND + newTime.tv_usec;
	return t;
}

/*
 * putIntoUnackQueueRing
 * 		Put the buffer into the ring.
 *
 * expTime - expiration time from now
 *
 */
static void
putIntoUnackQueueRing(UnackQueueRing *uqr, ICBuffer *buf, uint64 expTime, uint64 now)
{
	uint64 diff = now + expTime - uqr->currentTime;
	int idx = 0;

	if (diff >= UNACK_QUEUE_RING_LENGTH)
	{
	#ifdef AMS_VERBOSE_LOGGING
		write_log("putIntoUnackQueueRing:""now " UINT64_FORMAT "expTime " UINT64_FORMAT "diff " UINT64_FORMAT "uqr-currentTime " UINT64_FORMAT, now, expTime, diff, uqr->currentTime);
	#endif
		diff = UNACK_QUEUE_RING_LENGTH - 1;
	}
	else if (diff < TIMER_SPAN)
	{
		diff = TIMER_SPAN;
	}

	idx = (uqr->idx + diff/TIMER_SPAN) % UNACK_QUEUE_RING_SLOTS_NUM;

#ifdef AMS_VERBOSE_LOGGING
	write_log("PUTTW: curtime " UINT64_FORMAT " now " UINT64_FORMAT " (diff " UINT64_FORMAT ") expTime " UINT64_FORMAT " previdx %d, nowidx %d, nextidx %d", uqr->currentTime, now, diff, expTime, buf->unackQueueRingSlot, uqr->idx, idx);
#endif

	buf->unackQueueRingSlot = idx;
	icBufferListAppend(&unack_queue_ring.slots[idx], buf);
}

/*
 * handleDataPacket
 * 		Handling the data packet.
 *
 */
static bool
handleDataPacket(MotionConn *conn, icpkthdr *pkt, struct sockaddr_storage *peer, socklen_t *peerlen, AckSendParam *param)
{

	if ((pkt->len == sizeof(icpkthdr)) && (pkt->flags & UDPIC_FLAGS_CAPACITY))
	{
		if (DEBUG1 >= log_min_messages)
			write_log("status queuy message received, seq %d, srcpid %d, dstpid %d, icid %d, sid %d", pkt->seq, pkt->srcPid, pkt->dstPid, pkt->icId, pkt->sessionId);

	#ifdef AMS_VERBOSE_LOGGING
		logPkt("STATUS QUERY MESSAGE", pkt);
	#endif
		uint32 seq = conn->conn_info.seq > 0 ? conn->conn_info.seq - 1 : 0;
		uint32 extraSeq = conn->stopRequested ? seq : conn->conn_info.extraSeq;
		setAckSendParam(param, conn, UDPIC_FLAGS_CAPACITY | UDPIC_FLAGS_ACK | conn->conn_info.flags, seq, extraSeq);

		return false;
	}

	/*
	 * when we're not doing a full-setup on every
	 * statement, we've got to update the peer info --
	 * full setups do this at setup-time.
	 */

	/*
	 * Note the change here, for process start race and disordered message,
	 * if we do not fill in peer address, then we may send some acks to unknown address.
	 * Thus, the following condition is used.
	 *
	 */
	if (pkt->seq <= Gp_interconnect_queue_depth)
	{
		/* fill in the peer.  Need to cast away "volatile".  ugly */
		memset((void *)&conn->peer, 0, sizeof(conn->peer));
		memcpy((void *)&conn->peer, peer, *peerlen);
		conn->peer_len = *peerlen;

		conn->conn_info.dstListenerPort = pkt->dstListenerPort;
		if (DEBUG2 >= log_min_messages)
			write_log("received the head packets when eliding setup, pkt seq %d", pkt->seq);
	}

	/* data packet */
	if (pkt->flags & UDPIC_FLAGS_EOS)
	{
		if (DEBUG3 >= log_min_messages)
			write_log("received packet with EOS motid %d route %d seq %d",
					  pkt->motNodeId, conn->route, pkt->seq);
	}

	/*
	 * if we got a stop, but didn't request a stop --
	 * ignore, this is a startup blip: we must have
	 * acked with a stop -- we don't want to do
	 * anything further with the stop-message if we
	 * didn't request a stop!
	 *
	 * this is especially important after
	 * eliding setup is enabled.
	 */
	if (!conn->stopRequested && (pkt->flags & UDPIC_FLAGS_STOP))
	{
		if (pkt->flags & UDPIC_FLAGS_EOS)
		{
			write_log("non-requested stop flag, EOS! seq %d, flags 0x%x", pkt->seq, pkt->flags);
		}
		return false;
	}

	if (conn->stopRequested && conn->stillActive)
	{
		if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG && DEBUG5 >= log_min_messages)
			write_log("rx_thread got packet on active connection marked stopRequested. "
					  "(flags 0x%x) node %d route %d pkt seq %d conn seq %d",
					  pkt->flags, pkt->motNodeId, conn->route, pkt->seq, conn->conn_info.seq);

		/* can we update stillActive ? */
		if (DEBUG2 >= log_min_messages)
			if (!(pkt->flags & UDPIC_FLAGS_STOP) &&
				!(pkt->flags & UDPIC_FLAGS_EOS))
				write_log("stop requested but no stop flag on return packet ?!");

		if (pkt->flags & UDPIC_FLAGS_EOS)
			conn->conn_info.flags |= UDPIC_FLAGS_EOS;

		if (conn->conn_info.seq < pkt->seq)
			conn->conn_info.seq = pkt->seq; /* note here */

		setAckSendParam(param, conn, UDPIC_FLAGS_ACK | UDPIC_FLAGS_STOP | UDPIC_FLAGS_CAPACITY | conn->conn_info.flags, pkt->seq, pkt->seq);

		/* we only update stillActive if eos has been sent by peer. */
		if (pkt->flags & UDPIC_FLAGS_EOS)
		{
			if (DEBUG2 >= log_min_messages)
				write_log("stop requested and acknowledged by sending peer");
			conn->stillActive = false;
		}

		return false;
	}

	/* dropped ack or timeout */
	if (pkt->seq < conn->conn_info.seq)
	{
		ic_statistics.duplicatedPktNum++;
		if (DEBUG3 >= log_min_messages)
			write_log("dropped ack ? ignored data packet w/ cmd %d conn->cmd %d node %d route %d seq %d expected %d flags 0x%x",
					  pkt->icId, conn->conn_info.icId, pkt->motNodeId,
					  conn->route, pkt->seq, conn->conn_info.seq, pkt->flags);
		setAckSendParam(param, conn, UDPIC_FLAGS_ACK | UDPIC_FLAGS_CAPACITY | conn->conn_info.flags, conn->conn_info.seq - 1, conn->conn_info.extraSeq);

		return false;
	}

	/* sequence number is correct */
	if (!conn->stillActive)
	{
		/* peer may have dropped ack */
		if (gp_log_interconnect >= GPVARS_VERBOSITY_VERBOSE &&
			DEBUG1 >= log_min_messages)
			write_log("received on inactive connection node %d route %d (seq %d pkt->seq %d)",
					  pkt->motNodeId, conn->route, conn->conn_info.seq, pkt->seq);
		if (conn->conn_info.seq < pkt->seq)
			conn->conn_info.seq = pkt->seq;
		setAckSendParam(param, conn, UDPIC_FLAGS_ACK | UDPIC_FLAGS_STOP | UDPIC_FLAGS_CAPACITY | conn->conn_info.flags, pkt->seq, pkt->seq);

		return false;
	}

	/* headSeq is the seq for the head packet. */
	uint32 headSeq = conn->conn_info.seq - conn->pkt_q_size;
	if ((conn->pkt_q_size == Gp_interconnect_queue_depth) || (pkt->seq - headSeq >= Gp_interconnect_queue_depth))
	{
		/*
		 * Error case: NO RX SPACE or out of range pkt
		 * This indicates a bug.
		 */
		logPkt("Interconnect error: received a packet when the queue is full ", pkt);
		ic_statistics.disorderedPktNum++;
		conn->stat_count_dropped++;
		return false;
	}

	/* put the packet at the his position */
	bool toWakeup = false;

	int pos = (pkt->seq - 1) % Gp_interconnect_queue_depth;
	if (conn->pkt_q[pos] == NULL)
	{
		conn->pkt_q[pos] = (uint8 *)pkt;
		if (pos == conn->pkt_q_head)
		{
		#ifdef AMS_VERBOSE_LOGGING
			write_log("SAVE pkt at QUEUE HEAD [seq %d] for node %d route %d, queue head seq %d, queue size %d, queue head %d queue tail %d", pkt->seq, pkt->motNodeId, conn->route, headSeq, conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail);
		#endif
			toWakeup = true;
		}

		if (pos == conn->pkt_q_tail)
		{
			/* move the queue tail */
			for(;conn->pkt_q[conn->pkt_q_tail] != NULL && conn->pkt_q_size < Gp_interconnect_queue_depth;)
			{
				conn->pkt_q_size++;
				conn->pkt_q_tail = (conn->pkt_q_tail + 1) % Gp_interconnect_queue_depth;
				conn->conn_info.seq++;
			}

			/* set the EOS flag */
			if (((icpkthdr *)(conn->pkt_q[(conn->pkt_q_tail + Gp_interconnect_queue_depth - 1) % Gp_interconnect_queue_depth]))->flags & UDPIC_FLAGS_EOS)
			{
				conn->conn_info.flags |= UDPIC_FLAGS_EOS;
				if (DEBUG1 >= log_min_messages)
					write_log("RX_THREAD: the packet with EOS flag is available for access in the queue for route %d", conn->route);
			}

			/* ack data packet */
			setAckSendParam(param, conn, UDPIC_FLAGS_CAPACITY | UDPIC_FLAGS_ACK | conn->conn_info.flags, conn->conn_info.seq - 1, conn->conn_info.extraSeq);

			#ifdef AMS_VERBOSE_LOGGING
				write_log("SAVE conn %p pkt at QUEUE TAIL [seq %d] at pos [%d] for node %d route %d, [head seq] %d, queue size %d, queue head %d queue tail %d", conn, pkt->seq, pos, pkt->motNodeId, conn->route, headSeq, conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail);
			#endif
		}
		else /* deal with out-of-order packet */
		{
			if (DEBUG1 >= log_min_messages)
				write_log("SAVE conn %p OUT-OF-ORDER pkt [seq %d] at pos [%d] for node %d route %d, [head seq] %d, queue size %d, queue head %d queue tail %d", conn, pkt->seq, pos, pkt->motNodeId, conn->route, headSeq, conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail);

			/* send an ack for out-of-order packet */
			ic_statistics.disorderedPktNum++;
			handleDisorderPacket(conn, pos, headSeq + conn->pkt_q_size, pkt);
		}
	}
	else /* duplicate pkt */
	{
		if (DEBUG1 >= log_min_messages)
			write_log("DUPLICATE pkt [seq %d], [head seq] %d, queue size %d, queue head %d queue tail %d", pkt->seq, headSeq, conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail);

		setAckSendParam(param, conn, UDPIC_FLAGS_DUPLICATE | conn->conn_info.flags, pkt->seq, conn->conn_info.seq - 1);
		ic_statistics.duplicatedPktNum++;
		return false;
	}

		/* Was the main thread waiting for something ? */
	if (rx_control_info.mainWaitingState.waiting &&
			rx_control_info.mainWaitingState.waitingNode == pkt->motNodeId &&
			rx_control_info.mainWaitingState.waitingQuery == pkt->icId && toWakeup)
	{
		if (rx_control_info.mainWaitingState.waitingRoute == ANY_ROUTE)
		{
			if (rx_control_info.mainWaitingState.reachRoute == ANY_ROUTE)
				rx_control_info.mainWaitingState.reachRoute = conn->route;
		}
		else if (rx_control_info.mainWaitingState.waitingRoute == conn->route)
		{
			if (DEBUG2 >= log_min_messages)
				write_log("rx thread: main_waiting waking it route %d", rx_control_info.mainWaitingState.waitingRoute);
			rx_control_info.mainWaitingState.reachRoute = conn->route;
		}
		/* WAKE MAIN THREAD HERE */
#if defined(__darwin__) && !defined(IC_USE_PTHREAD_SYNCHRONIZATION)
		udpSignal(&ic_control_info.usig);
#else
		pthread_cond_signal(&ic_control_info.cond);
#endif
	}

	return true;
}

/*
 * rxThreadFunc
 * 		Main function of the receive background thread.
 *
 * NOTE: This function MUST NOT contain elog or ereport statements.
 * elog is NOT thread-safe.  Developers should instead use something like:
 *
 *	if (DEBUG3 >= log_min_messages)
 *		write_log("my brilliant log statement here.");
 *
 * NOTE: In threads, we cannot use palloc/pfree, because it's not thread safe.
 */
static void *
rxThreadFunc(void *arg)
{
	icpkthdr *pkt=NULL;
	bool	skip_poll=false;

	gp_set_thread_sigmasks();

	for (;;)
	{
		struct pollfd nfd;
		int		n;

		/* check shutdown condition*/

		if (compare_and_swap_32(&ic_control_info.shutdown, 1, 0))
		{
			if (DEBUG1 >= log_min_messages)
			{
				write_log("udp-ic: rx-thread shutting down");
			}
			break;
		}

		/* Try to get a buffer */
		if (pkt == NULL)
		{
			pthread_mutex_lock(&ic_control_info.lock);
			pkt = getRxBuffer(&rx_buffer_pool);
			pthread_mutex_unlock(&ic_control_info.lock);

			if (pkt == NULL)
			{
				setRxThreadError(ENOMEM);
				continue;
			}
		}

		if (!skip_poll)
		{
			/* Do we have inbound traffic to handle ?*/
			nfd.fd = UDP_listenerFd;
			nfd.events = POLLIN;

			n = poll(&nfd, 1, RX_THREAD_POLL_TIMEOUT);

			if (compare_and_swap_32(&ic_control_info.shutdown, 1, 0))
			{
				if (DEBUG1 >= log_min_messages)
				{
					write_log("udp-ic: rx-thread shutting down");
				}
				break;
			}

			if (n < 0)
			{
				if (errno == EINTR)
					continue;

				/*
				 * ERROR case: if simply break out the loop here, there will be a hung here,
				 * since main thread will never be waken up, and senders will not
				 * get responses anymore.
				 *
				 * Thus, we set an error flag, and let main thread to report an error.
				 */
				setRxThreadError(errno);
				continue;
			}

			if (n == 0)
				continue;
		}

		if (skip_poll || (n == 1 && (nfd.events & POLLIN)))
		{
			/* we've got something interesting to read */
			/* handle incoming */
			/* ready to read on our socket */
			MotionConn *conn = NULL;
			int read_count = 0;

			struct sockaddr_storage peer;
			socklen_t peerlen;

			peerlen = sizeof(peer);
			read_count = recvfrom(UDP_listenerFd, (char *)pkt, Gp_max_packet_size, 0,
								  (struct sockaddr *)&peer, &peerlen);

			if (compare_and_swap_32(&ic_control_info.shutdown, 1, 0))
			{
				if (DEBUG1 >= log_min_messages)
				{
					write_log("udp-ic: rx-thread shutting down");
				}
				break;
			}

			if (DEBUG5 >= log_min_messages)
				write_log("received inbound len %d", read_count);

			if (read_count < 0)
			{
				skip_poll = false;

				if (errno == EWOULDBLOCK || errno == EINTR)
					continue;

				write_log("Interconnect error: recvfrom (%d)", errno);
				/*
				 * ERROR case: if simply break out the loop here, there will be a hung here,
				 * since main thread will never be waken up, and senders will not
				 * get responses anymore.
				 *
				 * Thus, we set an error flag, and let main thread to report an error.
				 */
				setRxThreadError(errno);
				continue;
			}

			if (read_count < sizeof(icpkthdr))
			{
				if (DEBUG1 >= log_min_messages)
					write_log("Interconnect error: short conn receive (%d)", read_count);
				continue;
			}

			/* when we get a "good" recvfrom() result, we can skip poll() until we get a bad one. */
			skip_poll = true;

			/* length must be >= 0 */
			if (pkt->len < 0)
			{
				if (DEBUG3 >= log_min_messages)
					write_log("received inbound with negative length");
				continue;
			}

			if (pkt->len != read_count)
			{
				if (DEBUG3 >= log_min_messages)
					write_log("received inbound packet [%d], short: read %d bytes, pkt->len %d", pkt->seq, read_count, pkt->len);
				continue;
			}

			/*
			 * check the CRC of the payload.
			 */
			if (gp_interconnect_full_crc)
			{
				if (!checkCRC(pkt))
				{
					gp_atomic_add_32(&ic_statistics.crcErrors, 1);
					if (DEBUG2 >= log_min_messages)
						write_log("received network data error, dropping bad packet, user data unaffected.");
					continue;
				}
			}

			#ifdef AMS_VERBOSE_LOGGING
				logPkt("GOT MESSAGE", pkt);
			#endif

			AckSendParam param;
			memset(&param, 0, sizeof(AckSendParam));

			/*
			 * Get the connection for the pkt.
			 *
			 * 	The connection hash table should be locked until
			 * 	finishing the processing of the packet to avoid
			 *  the connection addition/removal from the hash table
			 *  during the mean time.
			 */

			pthread_mutex_lock(&ic_control_info.lock);
			conn = findConnByHeader(&ic_control_info.connHtab, pkt);

			if (conn != NULL)
			{
				/* Handling a regular packet */
				if (handleDataPacket(conn, pkt, &peer, &peerlen, &param))
					pkt = NULL;
				ic_statistics.recvPktNum++;
			}
			else
			{
				/*
				 * There may have two kinds of Mismatched packets:
				 *    a) Past packets from previous command after I was torn down
				 *    b) Future packets from current command before my connections are built.
				 *
				 * The handling logic is to "Ack the past and Nak the future".
				 */
				if ((pkt->flags & UDPIC_FLAGS_RECEIVER_TO_SENDER) == 0)
				{
					if (DEBUG1 >= log_min_messages)
						write_log("mismatched packet received, seq %d, srcpid %d, dstpid %d, icid %d, sid %d", pkt->seq, pkt->srcPid, pkt->dstPid, pkt->icId, pkt->sessionId);

				#ifdef AMS_VERBOSE_LOGGING
					logPkt("Got a Mismatched Packet", pkt);
				#endif

					if (handleMismatch(pkt, &peer, peerlen))
						pkt = NULL;
					ic_statistics.mismatchNum++;
				}
			}
			pthread_mutex_unlock(&ic_control_info.lock);

			/* real ack sending is after lock release to decrease the lock holding time. */
			if (param.msg.len != 0)
				sendAckWithParam(&param);
		}

		/* pthread_yield(); */
	}

	/* Before retrun, we release the packet. */
	if (pkt)
	{
		pthread_mutex_lock(&ic_control_info.lock);
		freeRxBuffer(&rx_buffer_pool, pkt);
		pkt = NULL;
		pthread_mutex_unlock(&ic_control_info.lock);
	}

	/* nothing to return */
	return NULL;
}

/*
 * handleMismatch
 * 		If the mismatched packet is from an old connection, we may need to
 * 		send an acknowledgment.
 *
 * We are called with the receiver-lock held, and we never release it.
 *
 * For QD:
 * 1) Not in hashtable     : NAK it/Do nothing
 * 	  Causes:  a) Start race
 * 	           b) Before the entry for the ic instance is inserted, an error happened.
 * 	           c) From past transactions: should no happen.
 * 2) Active in hashtable  : NAK it/Do nothing
 *    Causes:  a) Error reported after the entry is inserted, and connections are
 *                not inserted to the hashtable yet, and before teardown is called.
 * 3) Inactive in hashtable: ACK it (with stop)
 *    Causes: a) Normal execution: after teardown is called on current command.
 *            b) Error case, 2a) after teardown is called.
 *            c) Normal execution: from past history transactions (should not happen).
 *
 * For QE:
 * 1) pkt->id > Gp_interconnect_id : NAK it/Do nothing
 *    Causes: a) Start race
 *            b) Before Gp_interconnect_id is assigned to correct value, an error happened.
 * 2) lastTornIcId < pkt->id == Gp_interconnect_id: NAK it/Do nothing
 *    Causes:  a) Error reported after Gp_interconnect_id is set, and connections are
 *                not inserted to the hashtable yet, and before teardown is called.
 * 3) lastTornIcId == pkt->id == Gp_interconnect_id: ACK it (with stop)
 *    Causes:  a) Normal execution: after teardown is called on current command
 * 4) pkt->id < Gp_interconnect_id: NAK it/Do nothing/ACK it.
 *    Causes:  a) Should not happen.
 *
 */
static bool
handleMismatch(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len)
{
	bool cached = false;

	/*
	 * we want to ack old packets; but *must* avoid acking connection requests:
	 *
	 *	 "ACK the past, NAK the future" explicit NAKs aren't necessary, we just don't
	 *	 want to ACK future packets, that confuses everyone.
	 */
	if (pkt->seq > 0 && pkt->sessionId == gp_session_id)
	{
		bool need_ack=false;
		uint8 ack_flags=0;

		/*
		 * The QD-backends can't use a counter, they've potentially got multiple instances (one for each active cursor)
		 */
		if (Gp_role == GP_ROLE_DISPATCH)
		{
			struct CursorICHistoryEntry *p;

			p = getCursorIcEntry(&rx_control_info.cursorHistoryTable, pkt->icId);
			if (p)
			{
				if (p->status == 0)
				{
					/* Torn down. Ack the past. */
					need_ack = true;
				}
				else /* p->status == 1 */
				{
					/*
					 * Not torn down yet.
					 * It happens when an error (out-of-memory, network error...) occurred
					 * after the cursor entry is inserted into the table in interconnect setup process.
					 * The peer will be canceled.
					 */
					if (DEBUG1 >= log_min_messages)
						write_log("GOT A MISMATCH PACKET WITH ID %d HISTORY THINKS IT IS ACTIVE", pkt->icId);
					return cached; /* ignore, no ack */
				}
			}
			else
			{
				if (DEBUG1 >= log_min_messages)
					write_log("GOT A MISMATCH PACKET WITH ID %d HISTORY HAS NO RECORD", pkt->icId);

				/*
				 * No record means that two possibilities.
				 * 1) It is from the future. It is due to startup race. We do not ack future packets
				 * 2) Before the entry for the ic instance is inserted, an error happened. We do not
				 *    ack for this case too. The peer will be canceled.
				 */
				ack_flags = UDPIC_FLAGS_NAK;
				need_ack = false;

				if (gp_interconnect_cache_future_packets)
				{
					cached = cacheFuturePacket(pkt, peer, peer_len);
				}
			}
		}
		/* The QEs get to use a simple counter. */
		else if (Gp_role == GP_ROLE_EXECUTE)
		{
			if (gp_interconnect_id >= pkt->icId)
			{
				need_ack = true;

				/*
				 * We want to "ACK the past, but NAK the future."
				 *
				 * handleAck() will retransmit.
				 */
				if (pkt->seq >= 1 && pkt->icId > rx_control_info.lastTornIcId)
				{
					ack_flags = UDPIC_FLAGS_NAK;
					need_ack = false;
				}
			}
			else /* gp_interconnect_id < pkt->icId, from the future */
			{
				if (gp_interconnect_cache_future_packets)
				{
					cached = cacheFuturePacket(pkt, peer, peer_len);
				}
			}
		}

		if (need_ack)
		{
			MotionConn dummyconn;
			char buf[128];	/* numeric IP addresses shouldn't exceed about 50 chars, but play it safe */


			memcpy(&dummyconn.conn_info, pkt, sizeof(icpkthdr));
			dummyconn.peer = *peer;
			dummyconn.peer_len = peer_len;

			dummyconn.conn_info.flags |= ack_flags;

			if (DEBUG1 >= log_min_messages)
				write_log("ACKING PACKET WITH FLAGS: pkt->seq %d 0x%x [pkt->icId %d last-teardown %d interconnect_id %d]",
					  pkt->seq, dummyconn.conn_info.flags, pkt->icId, rx_control_info.lastTornIcId, gp_interconnect_id);

			formatSockAddr((struct sockaddr *)&dummyconn.peer, buf, sizeof(buf));

			if (DEBUG1 >= log_min_messages)
				write_log("ACKING PACKET TO %s", buf);

			if ((ack_flags & UDPIC_FLAGS_NAK) == 0)
			{
				ack_flags |= UDPIC_FLAGS_STOP | UDPIC_FLAGS_ACK | UDPIC_FLAGS_CAPACITY | UDPIC_FLAGS_RECEIVER_TO_SENDER;
			}
			else
			{
				ack_flags |= UDPIC_FLAGS_RECEIVER_TO_SENDER;
			}
			/*
			 * There are two cases, we may need to send a response to sender here.
			 * One is start race and the other is receiver becomes idle.
			 *
			 * ack_flags here can take two possible values
			 * 1) UDPIC_FLAGS_NAK | UDPIC_FLAGS_RECEIVER_TO_SENDER (for start race)
			 * 2) UDPIC_FLAGS_STOP | UDPIC_FLAGS_ACK | UDPIC_FLAGS_CAPACITY | UDPIC_FLAGS_RECEIVER_TO_SENDER (for idle receiver)
			 *
			 * The final flags in the packet may take some extra bits such as
			 * 1) UDPIC_FLAGS_STOP
			 * 2) UDPIC_FLAGS_EOS
			 * 3) UDPIC_FLAGS_CAPACITY
			 * which are from original packet
			 */
			sendAck(&dummyconn, ack_flags | dummyconn.conn_info.flags, dummyconn.conn_info.seq, dummyconn.conn_info.seq);
		}
	}
	else
	{
		if (DEBUG1 >= log_min_messages)
			write_log("dropping packet from command-id %d seq %d (my cmd %d)", pkt->icId, pkt->seq, gp_interconnect_id);
	}

	return cached;
}

/*
 * cacheFuturePacket
 *		Cache the future packets during the setupUdpInterconnect.
 *
 * Return true if packet is cached, otherwise false
 */
static bool
cacheFuturePacket(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len)
{
	MotionConn *conn = NULL;

	conn = findConnByHeader(&ic_control_info.startupCacheHtab, pkt);

	if (conn == NULL)
	{
		conn = malloc(sizeof(MotionConn));
		if (conn == NULL)
		{
			setRxThreadError(errno);
			return false;
		}

		memset((void *) conn, 0, sizeof(MotionConn));
		memcpy(&conn->conn_info, pkt, sizeof(icpkthdr));

		conn->pkt_q_size = Gp_interconnect_queue_depth;
		conn->pkt_q = (uint8 **) malloc(Gp_interconnect_queue_depth * sizeof(uint8 *));

		if (conn->pkt_q == NULL)
		{
			/* malloc failed.  */
			free(conn);
			setRxThreadError(errno);
			return false;
		}

		/* We only use the array to store cached packets. */
		memset(conn->pkt_q, 0, Gp_interconnect_queue_depth * sizeof(uint8 *));

		/* Put connection to the hashtable. */
		if (!connAddHash(&ic_control_info.startupCacheHtab, conn))
		{
			free(conn->pkt_q);
			free(conn);
			setRxThreadError(errno);
			return false;
		}

		/* Setup the peer sock information. */
		memcpy(&conn->peer, peer, peer_len);
		conn->peer_len = peer_len;
	}

	/* Reject packets with invalid sequence numbers and packets which have been cached before. */
	if (pkt->seq > conn->pkt_q_size || pkt->seq == 0 || conn->pkt_q[pkt->seq - 1] != NULL)
		return false;

	conn->pkt_q[pkt->seq - 1] = (uint8 *) pkt;
	rx_buffer_pool.maxCount++;
	ic_statistics.startupCachedPktNum++;
	return true;
}

/*
 * cleanupStartupCache
 *		Clean the startup cache.
 */
static void
cleanupStartupCache()
{
	ConnHtabBin *bin = NULL;
	MotionConn *cachedConn = NULL;
	icpkthdr *pkt = NULL;
	int i = 0;
	int j = 0;

	for (i = 0; i < ic_control_info.startupCacheHtab.size; i++)
	{
		bin = ic_control_info.startupCacheHtab.table[i];

		while (bin)
		{
			cachedConn = bin->conn;

			for (j = 0; j < cachedConn->pkt_q_size; j++)
			{
				pkt = (icpkthdr *) cachedConn->pkt_q[j];

				if (pkt == NULL)
					continue;

				rx_buffer_pool.maxCount--;

				putRxBufferToFreeList(&rx_buffer_pool, pkt);
				cachedConn->pkt_q[j] = NULL;
			}
			bin = bin->next;
			connDelHash(&ic_control_info.startupCacheHtab, cachedConn);

			/* MPP-19981
			 * free the cached connections; otherwise memory leak
			 * would be introduced.
			 */
			free(cachedConn->pkt_q);
			free(cachedConn);
		}
	}
}


/* The following functions are facility methods for debugging.
 * They are quite useful when there are a large number of connections.
 * These functions can be called from gdb to output internal information to a file.
 */

/*
 * dumpICBufferList_Internal
 * 		Dump a buffer list.
 */
static void
dumpICBufferList_Internal(ICBufferList *list, FILE *ofile)
{

	ICBufferLink *bufLink = list->head.next;

	int len = list->length;
	int i = 0;

	fprintf(ofile, "List Length %d\n", len);
	while (bufLink != &list->head && len > 0)
	{
		ICBuffer *buf = (list->type == ICBufferListType_Primary ? GET_ICBUFFER_FROM_PRIMARY(bufLink)
				: GET_ICBUFFER_FROM_SECONDARY(bufLink));
		fprintf(ofile, "Node %d, linkptr %p ", i++, bufLink);
		fprintf(ofile, "Packet Content [%s: seq %d extraSeq %d]: motNodeId %d, crc %d len %d "
				"srcContentId %d dstDesContentId %d "
				"srcPid %d dstPid %d "
				"srcListenerPort %d dstListernerPort %d "
				"sendSliceIndex %d recvSliceIndex %d "
				"sessionId %d icId %d "
				"flags %d\n",
				buf->pkt->flags & UDPIC_FLAGS_RECEIVER_TO_SENDER ? "ACK" : "DATA",
				buf->pkt->seq, buf->pkt->extraSeq, buf->pkt->motNodeId, buf->pkt->crc, buf->pkt->len,
				buf->pkt->srcContentId, buf->pkt->dstContentId,
				buf->pkt->srcPid, buf->pkt->dstPid,
				buf->pkt->srcListenerPort, buf->pkt->dstListenerPort,
				buf->pkt->sendSliceIndex, buf->pkt->recvSliceIndex,
				buf->pkt->sessionId, buf->pkt->icId,
				buf->pkt->flags);
		bufLink = bufLink->next;
		len--;
	}
}


/*
 * dumpICBufferList
 * 		Dump a buffer list.
 */
void
dumpICBufferList(ICBufferList *list, const char *fname)
{
	FILE *ofile = fopen(fname, "w+");

	dumpICBufferList_Internal(list, ofile);
    fclose(ofile);
}

/*
 * dumpUnackQueueRing
 * 		Dump an unack queue ring.
 */
void
dumpUnackQueueRing(const char *fname)
{
	FILE *ofile = fopen(fname, "w+");
    int i;

    fprintf(ofile, "UnackQueueRing: currentTime " UINT64_FORMAT ", idx %d numOutstanding %d numSharedOutstanding %d\n",
    		unack_queue_ring.currentTime, unack_queue_ring.idx,
    		unack_queue_ring.numOutStanding, unack_queue_ring.numSharedOutStanding);
    fprintf(ofile, "==================================\n");
    for (i = 0; i < UNACK_QUEUE_RING_SLOTS_NUM; i++)
    {
    	if (icBufferListLength(&unack_queue_ring.slots[i]) > 0)
    	{
    		dumpICBufferList_Internal(&unack_queue_ring.slots[i], ofile);
    	}
    }

    fclose(ofile);
}

/*
 * dumpConnections
 * 		Dump connections.
 */
void
dumpConnections(ChunkTransportStateEntry *pEntry, const char *fname)
{
	int			i, j;
    MotionConn *conn;

	FILE *ofile = fopen(fname, "w+");
    fprintf(ofile, "Entry connections: conn num %d \n", pEntry->numPrimaryConns);
    fprintf(ofile, "==================================\n");

    for (i = 0; i < pEntry->numPrimaryConns; i++)
	{
		conn = &pEntry->conns[i];

		fprintf(ofile, "conns[%d] motNodeId=%d: remoteContentId=%d pid=%d sockfd=%d remote=%s local=%s "
				"capacity=%d sentSeq=%d receivedAckSeq=%d consumedSeq=%d rtt=" UINT64_FORMAT
				" dev=" UINT64_FORMAT " deadlockCheckBeginTime=" UINT64_FORMAT " route=%d msgSize=%d msgPos=%p"
				" recvBytes=%d tupleCount=%d waitEOS=%d stillActive=%d stopRequested=%d "
				"state=%d\n",
				 i, pEntry->motNodeId,
				 conn->remoteContentId,
				 conn->cdbProc ? conn->cdbProc->pid : 0,
				 conn->sockfd,
				 conn->remoteHostAndPort,
				 conn->localHostAndPort,
				 conn->capacity, conn->sentSeq, conn->receivedAckSeq, conn->consumedSeq,
				 conn->rtt, conn->dev, conn->deadlockCheckBeginTime, conn->route, conn->msgSize, conn->msgPos,
				 conn->recvBytes, conn->tupleCount, conn->waitEOS, conn->stillActive, conn->stopRequested,
				 conn->state);
		fprintf(ofile, "conn_info [%s: seq %d extraSeq %d]: motNodeId %d, crc %d len %d "
				"srcContentId %d dstDesContentId %d "
				"srcPid %d dstPid %d "
				"srcListenerPort %d dstListernerPort %d "
				"sendSliceIndex %d recvSliceIndex %d "
				"sessionId %d icId %d "
				"flags %d\n",
				conn->conn_info.flags & UDPIC_FLAGS_RECEIVER_TO_SENDER ? "ACK" : "DATA",
				conn->conn_info.seq, conn->conn_info.extraSeq, conn->conn_info.motNodeId, conn->conn_info.crc, conn->conn_info.len,
				conn->conn_info.srcContentId, conn->conn_info.dstContentId,
				conn->conn_info.srcPid, conn->conn_info.dstPid,
				conn->conn_info.srcListenerPort, conn->conn_info.dstListenerPort,
				conn->conn_info.sendSliceIndex, conn->conn_info.recvSliceIndex,
				conn->conn_info.sessionId, conn->conn_info.icId,
				conn->conn_info.flags);

		if (!ic_control_info.isSender)
		{
			fprintf(ofile, "pkt_q_size=%d pkt_q_head=%d pkt_q_tail=%d pkt_q=%p\n", conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail, conn->pkt_q);
			for(j = 0; j < Gp_interconnect_queue_depth; j++)
			{
				if (conn->pkt_q != NULL && conn->pkt_q[j] != NULL)
				{
					icpkthdr *pkt = (icpkthdr *)conn->pkt_q[j];
					fprintf(ofile, "Packet (pos %d) Info [%s: seq %d extraSeq %d]: motNodeId %d, crc %d len %d "
							"srcContentId %d dstDesContentId %d "
							"srcPid %d dstPid %d "
							"srcListenerPort %d dstListernerPort %d "
							"sendSliceIndex %d recvSliceIndex %d "
							"sessionId %d icId %d "
							"flags %d\n",
							j,
							pkt->flags & UDPIC_FLAGS_RECEIVER_TO_SENDER ? "ACK" : "DATA",
							pkt->seq, pkt->extraSeq, pkt->motNodeId, pkt->crc, pkt->len,
							pkt->srcContentId, pkt->dstContentId,
							pkt->srcPid, pkt->dstPid,
							pkt->srcListenerPort, pkt->dstListenerPort,
							pkt->sendSliceIndex, pkt->recvSliceIndex,
							pkt->sessionId, pkt->icId,
							pkt->flags);
				}
			}
		}
		if (ic_control_info.isSender)
		{
			fprintf(ofile, "sndQueue ");
			dumpICBufferList_Internal(&conn->sndQueue, ofile);
			fprintf(ofile, "unackQueue ");
			dumpICBufferList_Internal(&conn->unackQueue, ofile);
		}
		fprintf(ofile, "\n");
	}
    fclose(ofile);
}

void
WaitInterconnectQuitUDP(void)
{
	if (Gp_role == GP_ROLE_UTILITY)
	{
		return;	
	}

	/*
	 * Just in case ic thread is waiting on the locks.
	*/
	pthread_mutex_unlock(&ic_control_info.errorLock);
	pthread_mutex_unlock(&ic_control_info.lock);

	compare_and_swap_32(&ic_control_info.shutdown, 0, 1);

	if (ic_control_info.threadCreated)
	{
		SendDummyPacket();
		pthread_join(ic_control_info.threadHandle, NULL);
	}
	ic_control_info.threadCreated = false;
}


/*
 * checkQDConnectionAlive
 *    Check whether QD connection is still alive. If not, report error.
 */
void
checkQDConnectionAlive(void)
{
  if (!dispatch_validate_conn(MyProcPort->sock))
  {
    if (Gp_role == GP_ROLE_EXECUTE)
      ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
              errmsg("Interconnect error segment lost contact with master (recv)")));
    else
      ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
              errmsg("Interconnect error master lost contact with client (recv)")));
  }
}
